From 5e992842cf178d8690f541ca22e01942c7635125 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Wed, 17 Aug 2016 19:47:25 +0200 Subject: [PATCH] Use Futures.transform instead of lazyTransform when possible --- .../com/google/cloud/logging/LoggingImpl.java | 42 +++++++++++-------- .../com/google/cloud/pubsub/PubSubImpl.java | 41 +++++++++++------- 2 files changed, 50 insertions(+), 33 deletions(-) diff --git a/gcloud-java-logging/src/main/java/com/google/cloud/logging/LoggingImpl.java b/gcloud-java-logging/src/main/java/com/google/cloud/logging/LoggingImpl.java index 531105005b04..225aba62ced2 100644 --- a/gcloud-java-logging/src/main/java/com/google/cloud/logging/LoggingImpl.java +++ b/gcloud-java-logging/src/main/java/com/google/cloud/logging/LoggingImpl.java @@ -24,7 +24,6 @@ import static com.google.cloud.logging.Logging.WriteOption.OptionType.LABELS; import static com.google.cloud.logging.Logging.WriteOption.OptionType.LOG_NAME; import static com.google.cloud.logging.Logging.WriteOption.OptionType.RESOURCE; -import static com.google.common.util.concurrent.Futures.lazyTransform; import com.google.cloud.AsyncPage; import com.google.cloud.AsyncPageImpl; @@ -43,6 +42,8 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import com.google.logging.v2.CreateLogMetricRequest; import com.google.logging.v2.CreateSinkRequest; @@ -103,6 +104,14 @@ private static V get(Future future) { } } + private static Future transform(Future future, + Function function) { + if (future instanceof ListenableFuture) { + return Futures.transform((ListenableFuture) future, function); + } + return Futures.lazyTransform(future, function); + } + private abstract static class BasePageFetcher implements AsyncPageImpl.NextPageFetcher { private static final long serialVersionUID = 5095123855547444030L; @@ -198,7 +207,7 @@ public Future createAsync(SinkInfo sink) { .setParent(ConfigServiceV2Api.formatParentName(options().projectId())) .setSink(sink.toPb(options().projectId())) .build(); - return lazyTransform(rpc.create(request), Sink.fromPbFunction(this)); + return transform(rpc.create(request), Sink.fromPbFunction(this)); } @Override @@ -212,7 +221,7 @@ public Future updateAsync(SinkInfo sink) { .setSinkName(ConfigServiceV2Api.formatSinkName(options().projectId(), sink.name())) .setSink(sink.toPb(options().projectId())) .build(); - return lazyTransform(rpc.update(request), Sink.fromPbFunction(this)); + return transform(rpc.update(request), Sink.fromPbFunction(this)); } @Override @@ -225,7 +234,7 @@ public Future getSinkAsync(String sink) { GetSinkRequest request = GetSinkRequest.newBuilder() .setSinkName(ConfigServiceV2Api.formatSinkName(options().projectId(), sink)) .build(); - return lazyTransform(rpc.get(request), Sink.fromPbFunction(this)); + return transform(rpc.get(request), Sink.fromPbFunction(this)); } private static ListSinksRequest listSinksRequest(LoggingOptions serviceOptions, @@ -247,7 +256,7 @@ private static Future> listSinksAsync(final LoggingOptions servi final Map options) { final ListSinksRequest request = listSinksRequest(serviceOptions, options); Future list = serviceOptions.rpc().list(request); - return lazyTransform(list, new Function>() { + return transform(list, new Function>() { @Override public AsyncPage apply(ListSinksResponse listSinksResponse) { List sinks = listSinksResponse.getSinksList() == null ? ImmutableList.of() @@ -281,7 +290,7 @@ public Future deleteSinkAsync(String sink) { DeleteSinkRequest request = DeleteSinkRequest.newBuilder() .setSinkName(ConfigServiceV2Api.formatSinkName(options().projectId(), sink)) .build(); - return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION); + return transform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION); } public boolean deleteLog(String log) { @@ -292,7 +301,7 @@ public Future deleteLogAsync(String log) { DeleteLogRequest request = DeleteLogRequest.newBuilder() .setLogName(LoggingServiceV2Api.formatLogName(options().projectId(), log)) .build(); - return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION); + return transform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION); } private static ListMonitoredResourceDescriptorsRequest listMonitoredResourceDescriptorsRequest( @@ -316,7 +325,7 @@ private static ListMonitoredResourceDescriptorsRequest listMonitoredResourceDesc final ListMonitoredResourceDescriptorsRequest request = listMonitoredResourceDescriptorsRequest(options); Future list = serviceOptions.rpc().list(request); - return lazyTransform(list, new Function>() { @Override public AsyncPage apply( @@ -355,7 +364,7 @@ public Future createAsync(MetricInfo metric) { .setParent(MetricsServiceV2Api.formatParentName(options().projectId())) .setMetric(metric.toPb()) .build(); - return lazyTransform(rpc.create(request), Metric.fromPbFunction(this)); + return transform(rpc.create(request), Metric.fromPbFunction(this)); } @Override @@ -369,7 +378,7 @@ public Future updateAsync(MetricInfo metric) { .setMetricName(MetricsServiceV2Api.formatMetricName(options().projectId(), metric.name())) .setMetric(metric.toPb()) .build(); - return lazyTransform(rpc.update(request), Metric.fromPbFunction(this)); + return transform(rpc.update(request), Metric.fromPbFunction(this)); } @Override @@ -382,7 +391,7 @@ public Future getMetricAsync(String metric) { GetLogMetricRequest request = GetLogMetricRequest.newBuilder() .setMetricName(MetricsServiceV2Api.formatMetricName(options().projectId(), metric)) .build(); - return lazyTransform(rpc.get(request), Metric.fromPbFunction(this)); + return transform(rpc.get(request), Metric.fromPbFunction(this)); } private static ListLogMetricsRequest listMetricsRequest(LoggingOptions serviceOptions, @@ -404,7 +413,7 @@ private static Future> listMetricsAsync(final LoggingOptions s final Map options) { final ListLogMetricsRequest request = listMetricsRequest(serviceOptions, options); Future list = serviceOptions.rpc().list(request); - return lazyTransform(list, new Function>() { + return transform(list, new Function>() { @Override public AsyncPage apply(ListLogMetricsResponse listMetricsResponse) { List metrics = listMetricsResponse.getMetricsList() == null @@ -438,7 +447,7 @@ public Future deleteMetricAsync(String metric) { DeleteLogMetricRequest request = DeleteLogMetricRequest.newBuilder() .setMetricName(MetricsServiceV2Api.formatMetricName(options().projectId(), metric)) .build(); - return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION); + return transform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION); } private static WriteLogEntriesRequest writeLogEntriesRequest(LoggingOptions serviceOptions, @@ -466,9 +475,8 @@ public void write(Iterable logEntries, WriteOption... options) { } public Future writeAsync(Iterable logEntries, WriteOption... options) { - return lazyTransform( - rpc.write(writeLogEntriesRequest(options(), logEntries, optionMap(options))), - WRITE_RESPONSE_TO_VOID_FUNCTION); + return transform(rpc.write(writeLogEntriesRequest(options(), logEntries, optionMap(options))), + WRITE_RESPONSE_TO_VOID_FUNCTION); } private static ListLogEntriesRequest listLogEntriesRequest(LoggingOptions serviceOptions, @@ -498,7 +506,7 @@ private static Future> listLogEntriesAsync( final LoggingOptions serviceOptions, final Map options) { final ListLogEntriesRequest request = listLogEntriesRequest(serviceOptions, options); Future list = serviceOptions.rpc().list(request); - return lazyTransform(list, new Function>() { + return transform(list, new Function>() { @Override public AsyncPage apply(ListLogEntriesResponse listLogEntrysResponse) { List entries = listLogEntrysResponse.getEntriesList() == null diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index 355d7c36aed1..88c87b56b0f7 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -21,7 +21,6 @@ import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR_FACTORY; import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.MAX_QUEUED_CALLBACKS; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.util.concurrent.Futures.lazyTransform; import com.google.cloud.AsyncPage; import com.google.cloud.AsyncPageImpl; @@ -40,6 +39,8 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Empty; import com.google.pubsub.v1.AcknowledgeRequest; @@ -187,6 +188,14 @@ private static V get(Future future) { } } + private static Future transform(Future future, + Function function) { + if (future instanceof ListenableFuture) { + return Futures.transform((ListenableFuture) future, function); + } + return Futures.lazyTransform(future, function); + } + @Override public Topic create(TopicInfo topic) { return get(createAsync(topic)); @@ -194,7 +203,7 @@ public Topic create(TopicInfo topic) { @Override public Future createAsync(TopicInfo topic) { - return lazyTransform(rpc.create(topic.toPb(options().projectId())), Topic.fromPbFunction(this)); + return transform(rpc.create(topic.toPb(options().projectId())), Topic.fromPbFunction(this)); } @Override @@ -207,7 +216,7 @@ public Future getTopicAsync(String topic) { GetTopicRequest request = GetTopicRequest.newBuilder() .setTopic(PublisherApi.formatTopicName(options().projectId(), topic)) .build(); - return lazyTransform(rpc.get(request), Topic.fromPbFunction(this)); + return transform(rpc.get(request), Topic.fromPbFunction(this)); } @Override @@ -220,7 +229,7 @@ public Future deleteTopicAsync(String topic) { DeleteTopicRequest request = DeleteTopicRequest.newBuilder() .setTopic(PublisherApi.formatTopicName(options().projectId(), topic)) .build(); - return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION); + return transform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION); } private static ListTopicsRequest listTopicsRequest(PubSubOptions serviceOptions, @@ -242,7 +251,7 @@ private static Future> listTopicsAsync(final PubSubOptions serv final Map options) { final ListTopicsRequest request = listTopicsRequest(serviceOptions, options); Future list = serviceOptions.rpc().list(request); - return lazyTransform(list, new Function>() { + return transform(list, new Function>() { @Override public AsyncPage apply(ListTopicsResponse listTopicsResponse) { List topics = listTopicsResponse.getTopicsList() == null ? ImmutableList.of() @@ -281,7 +290,7 @@ private static PublishRequest publishRequest(PubSubOptions serviceOptions, Strin @Override public Future publishAsync(String topic, Message message) { - return lazyTransform( + return transform( rpc.publish(publishRequest(options(), topic, Collections.singletonList(message))), new Function() { @Override @@ -308,7 +317,7 @@ public List publish(String topic, Iterable messages) { @Override public Future> publishAsync(String topic, Iterable messages) { - return lazyTransform(rpc.publish(publishRequest(options(), topic, messages)), + return transform(rpc.publish(publishRequest(options(), topic, messages)), new Function>() { @Override public List apply(PublishResponse publishResponse) { @@ -324,7 +333,7 @@ public Subscription create(SubscriptionInfo subscription) { @Override public Future createAsync(SubscriptionInfo subscription) { - return lazyTransform(rpc.create(subscription.toPb(options().projectId())), + return transform(rpc.create(subscription.toPb(options().projectId())), Subscription.fromPbFunction(this)); } @@ -338,7 +347,7 @@ public Future getSubscriptionAsync(String subscription) { GetSubscriptionRequest request = GetSubscriptionRequest.newBuilder() .setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription)) .build(); - return lazyTransform(rpc.get(request), Subscription.fromPbFunction(this)); + return transform(rpc.get(request), Subscription.fromPbFunction(this)); } @Override @@ -353,7 +362,7 @@ public Future replacePushConfigAsync(String subscription, PushConfig pushC .setPushConfig(pushConfig != null ? pushConfig.toPb() : com.google.pubsub.v1.PushConfig.getDefaultInstance()) .build(); - return lazyTransform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION); + return transform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION); } @Override @@ -366,7 +375,7 @@ public Future deleteSubscriptionAsync(String subscription) { DeleteSubscriptionRequest request = DeleteSubscriptionRequest.newBuilder() .setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription)) .build(); - return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION); + return transform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION); } private static ListSubscriptionsRequest listSubscriptionsRequest(PubSubOptions serviceOptions, @@ -388,7 +397,7 @@ private static Future> listSubscriptionsAsync( final PubSubOptions serviceOptions, final Map options) { final ListSubscriptionsRequest request = listSubscriptionsRequest(serviceOptions, options); Future list = serviceOptions.rpc().list(request); - return lazyTransform(list, new Function>() { + return transform(list, new Function>() { @Override public AsyncPage apply(ListSubscriptionsResponse listSubscriptionsResponse) { List subscriptions = listSubscriptionsResponse.getSubscriptionsList() == null @@ -432,7 +441,7 @@ private static Future> listSubscriptionsAsync(final St final ListTopicSubscriptionsRequest request = listSubscriptionsRequest(topic, serviceOptions, options); Future list = serviceOptions.rpc().list(request); - return lazyTransform(list, + return transform(list, new Function>() { @Override public AsyncPage apply( @@ -493,7 +502,7 @@ public void failure(Throwable error) { // ignore } }); - return lazyTransform(future, new Function>() { + return transform(future, new Function>() { @Override public Iterator apply(PullResponse response) { return Iterators.transform(response.getReceivedMessagesList().iterator(), @@ -540,7 +549,7 @@ public Future ackAsync(String subscription, Iterable ackIds) { .setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription)) .addAllAckIds(ackIds) .build(); - return lazyTransform(rpc.acknowledge(request), EMPTY_TO_VOID_FUNCTION); + return transform(rpc.acknowledge(request), EMPTY_TO_VOID_FUNCTION); } @Override @@ -589,7 +598,7 @@ public Future modifyAckDeadlineAsync(String subscription, int deadline, Ti .setAckDeadlineSeconds((int) TimeUnit.SECONDS.convert(deadline, unit)) .addAllAckIds(ackIds) .build(); - return lazyTransform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION); + return transform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION); } static Map optionMap(Option... options) {