8000 Use Futures.transform instead of lazyTransform when possible by mziccard · Pull Request #1168 · googleapis/google-cloud-java · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8000
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static com.google.cloud.logging.Logging.WriteOption.OptionType.LABELS;
import static com.google.cloud.logging.Logging.WriteOption.OptionType.LOG_NAME;
import static com.google.cloud.logging.Logging.WriteOption.OptionType.RESOURCE;
import static com.google.common.util.concurrent.Futures.lazyTransform;

import com.google.cloud.AsyncPage;
import com.google.cloud.AsyncPageImpl;
Expand All @@ -43,6 +42,8 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.logging.v2.CreateLogMetricRequest;
import com.google.logging.v2.CreateSinkRequest;
Expand Down Expand Up @@ -103,6 +104,14 @@ private static <V> V get(Future<V> future) {
}
}

private static <I, O> Future<O> transform(Future<I> future,
Function<? super I, ? extends O> function) {
if (future instanceof ListenableFuture) {
return Futures.transform((ListenableFuture<I>) future, function);
}
return Futures.lazyTransform(future, function);
}

private abstract static class BasePageFetcher<T> implements AsyncPageImpl.NextPageFetcher<T> {

private static final long serialVersionUID = 5095123855547444030L;
Expand Down Expand Up @@ -198,7 +207,7 @@ public Future<Sink> createAsync(SinkInfo sink) {
.setParent(ConfigServiceV2Api.formatParentName(options().projectId()))
.setSink(sink.toPb(options().projectId()))
.build();
return lazyTransform(rpc.create(request), Sink.fromPbFunction(this));
return transform(rpc.create(request), Sink.fromPbFunction(this));
}

@Override
Expand All @@ -212,7 +221,7 @@ public Future<Sink> updateAsync(SinkInfo sink) {
.setSinkName(ConfigServiceV2Api.formatSinkName(options().projectId(), sink.name()))
.setSink(sink.toPb(options().projectId()))
.build();
return lazyTransform(rpc.update(request), Sink.fromPbFunction(this));
return transform(rpc.update(request), Sink.fromPbFunction(this));
}

@Override
Expand All @@ -225,7 +234,7 @@ public Future<Sink> getSinkAsync(String sink) {
GetSinkRequest request = GetSinkRequest.newBuilder()
.setSinkName(ConfigServiceV2Api.formatSinkName(options().projectId(), sink))
.build();
return lazyTransform(rpc.get(request), Sink.fromPbFunction(this));
return transform(rpc.get(request), Sink.fromPbFunction(this));
}

private static ListSinksRequest listSinksRequest(LoggingOptions serviceOptions,
Expand All @@ -247,7 +256,7 @@ private static Future<AsyncPage<Sink>> listSinksAsync(final LoggingOptions servi
final Map<Option.OptionType, ?> options) {
final ListSinksRequest request = listSinksRequest(serviceOptions, options);
Future<ListSinksResponse> list = serviceOptions.rpc().list(request);
return lazyTransform(list, new Function<ListSinksResponse, AsyncPage<Sink>>() {
return transform(list, new Function<ListSinksResponse, AsyncPage<Sink>>() {
@Override
public AsyncPage<Sink> apply(ListSinksResponse listSinksResponse) {
List<Sink> sinks = listSinksResponse.getSinksList() == null ? ImmutableList.<Sink>of()
Expand Down Expand Up @@ -281,7 +290,7 @@ public Future<Boolean> deleteSinkAsync(String sink) {
DeleteSinkRequest request = DeleteSinkRequest.newBuilder()
.setSinkName(ConfigServiceV2Api.formatSinkName(options().projectId(), sink))
.build();
return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
return transform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
}

public boolean deleteLog(String log) {
Expand All @@ -292,7 +301,7 @@ public Future<Boolean> deleteLogAsync(String log) {
DeleteLogRequest request = DeleteLogRequest.newBuilder()
.setLogName(LoggingServiceV2Api.formatLogName(options().projectId(), log))
.build();
return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
return transform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
}

private static ListMonitoredResourceDescriptorsRequest listMonitoredResourceDescriptorsRequest(
Expand All @@ -316,7 +325,7 @@ private static ListMonitoredResourceDescriptorsRequest listMonitoredResourceDesc
final ListMonitoredResourceDescriptorsRequest request =
listMonitoredResourceDescriptorsRequest(options);
Future<ListMonitoredResourceDescriptorsResponse> list = serviceOptions.rpc().list(request);
return lazyTransform(list, new Function<ListMonitoredResourceDescriptorsResponse,
return transform(list, new Function<ListMonitoredResourceDescriptorsResponse,
AsyncPage<MonitoredResourceDescriptor>>() {
@Override
public AsyncPage<MonitoredResourceDescriptor> apply(
Expand Down Expand Up @@ -355,7 +364,7 @@ public Future<Metric> createAsync(MetricInfo metric) {
.setParent(MetricsServiceV2Api.formatParentName(options().projectId()))
.setMetric(metric.toPb())
.build();
return lazyTransform(rpc.create(request), Metric.fromPbFunction(this));
return transform(rpc.create(request), Metric.fromPbFunction(this));
}

@Override
Expand All @@ -369,7 +378,7 @@ public Future<Metric> updateAsync(MetricInfo metric) {
.setMetricName(MetricsServiceV2Api.formatMetricName(options().projectId(), metric.name()))
.setMetric(metric.toPb())
.build();
return lazyTransform(rpc.update(request), Metric.fromPbFunction(this));
return transform(rpc.update(request), Metric.fromPbFunction(this));
}

@Override
Expand All @@ -382,7 +391,7 @@ public Future<Metric> getMetricAsync(String metric) {
GetLogMetricRequest request = GetLogMetricRequest.newBuilder()
.setMetricName(MetricsServiceV2Api.formatMetricName(options().projectId(), metric))
.build();
return lazyTransform(rpc.get(request), Metric.fromPbFunction(this));
return transform(rpc.get(request), Metric.fromPbFunction(this));
}

private static ListLogMetricsRequest listMetricsRequest(LoggingOptions serviceOptions,
Expand All @@ -404,7 +413,7 @@ private static Future<AsyncPage<Metric>> listMetricsAsync(final LoggingOptions s
final Map<Option.OptionType, ?> options) {
final ListLogMetricsRequest request = listMetricsRequest(serviceOptions, options);
Future<ListLogMetricsResponse> list = serviceOptions.rpc().list(request);
return lazyTransform(list, new Function<ListLogMetricsResponse, AsyncPage<Metric>>() {
return transform(list, new Function<ListLogMetricsResponse, AsyncPage<Metric>>() {
@Override
public AsyncPage<Metric> apply(ListLogMetricsResponse listMetricsResponse) {
List<Metric> metrics = listMetricsResponse.getMetricsList() == null
Expand Down Expand Up @@ -438,7 +447,7 @@ public Future<Boolean> deleteMetricAsync(String metric) {
DeleteLogMetricRequest request = DeleteLogMetricRequest.newBuilder()
.setMetricName(MetricsServiceV2Api.formatMetricName(options().projectId(), metric))
.build();
return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
return transform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
}

private static WriteLogEntriesRequest writeLogEntriesRequest(LoggingOptions serviceOptions,
Expand Down Expand Up @@ -466,9 +475,8 @@ public void write(Iterable<LogEntry> logEntries, WriteOption... options) {
}

public Future<Void> writeAsync(Iterable<LogEntry> logEntries, WriteOption... options) {
return lazyTransform(
rpc.write(writeLogEntriesRequest(options(), logEntries, optionMap(options))),
WRITE_RESPONSE_TO_VOID_FUNCTION);
return transform(rpc.write(writeLogEntriesRequest(options(), logEntries, optionMap(options))),
WRITE_RESPONSE_TO_VOID_FUNCTION);
}

private static ListLogEntriesRequest listLogEntriesRequest(LoggingOptions serviceOptions,
Expand Down Expand Up @@ -498,7 +506,7 @@ private static Future<AsyncPage<LogEntry>> listLogEntriesAsync(
final LoggingOptions serviceOptions, final Map<Option.OptionType, ?> options) {
final ListLogEntriesRequest request = listLogEntriesRequest(serviceOptions, options);
Future<ListLogEntriesResponse> list = serviceOptions.rpc().list(request);
return lazyTransform(list, new Function<ListLogEntriesResponse, AsyncPage<LogEntry>>() {
return transform(list, new Function<ListLogEntriesResponse, AsyncPage<LogEntry>>() {
@Override
public AsyncPage<LogEntry> apply(ListLogEntriesResponse listLogEntrysResponse) {
List<LogEntry> entries = listLogEntrysResponse.getEntriesList() == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR_FACTORY;
import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.MAX_QUEUED_CALLBACKS;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.lazyTransform;

import com.google.cloud.AsyncPage;
import com.google.cloud.AsyncPageImpl;
Expand All @@ -40,6 +39,8 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.AcknowledgeRequest;
Expand Down Expand Up @@ -187,14 +188,22 @@ private static <V> V get(Future<V> future) {
}
}

private static <I, O> Future<O> transform(Future<I> future,
Function<? super I, ? extends O> function) {
if (future instanceof ListenableFuture) {
return Futures.transform((ListenableFuture<I>) future, function);
}
return Futures.lazyTransform(future, function);
}

@Override
public Topic create(TopicInfo topic) {
return get(createAsync(topic));
}

@Override
public Future<Topic> createAsync(TopicInfo topic) {
return lazyTransform(rpc.create(topic.toPb(options().projectId())), Topic.fromPbFunction(this));
return transform(rpc.create(topic.toPb(options().projectId())), Topic.fromPbFunction(this));
}

@Override
Expand All @@ -207,7 +216,7 @@ public Future<Topic> getTopicAsync(String topic) {
GetTopicRequest request = GetTopicRequest.newBuilder()
.setTopic(PublisherApi.formatTopicName(options().projectId(), topic))
.build();
return lazyTransform(rpc.get(request), Topic.fromPbFunction(this));
return transform(rpc.get(request), Topic.fromPbFunction(this));
}

@Override
Expand All @@ -220,7 +229,7 @@ public Future<Boolean> deleteTopicAsync(String topic) {
DeleteTopicRequest request = DeleteTopicRequest.newBuilder()
.setTopic(PublisherApi.formatTopicName(options().projectId(), topic))
.build();
return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
return transform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
}

private static ListTopicsRequest listTopicsRequest(PubSubOptions serviceOptions,
Expand All @@ -242,7 +251,7 @@ private static Future<AsyncPage<Topic>> listTopicsAsync(final PubSubOptions serv
final Map<Option.OptionType, ?> options) {
final ListTopicsRequest request = listTopicsRequest(serviceOptions, options);
Future<ListTopicsResponse> list = serviceOptions.rpc().list(request);
return lazyTransform(list, new Function<ListTopicsResponse, AsyncPage<Topic>>() {
return transform(list, new Function<ListTopicsResponse, AsyncPage<Topic>>() {
@Override
public AsyncPage<Topic> apply(ListTopicsResponse listTopicsResponse) {
List<Topic> topics = listTopicsResponse.getTopicsList() == null ? ImmutableList.<Topic>of()
Expand Down Expand Up @@ -281,7 +290,7 @@ private static PublishRequest publishRequest(PubSubOptions serviceOptions, Strin

@Override
public Future<String> publishAsync(String topic, Message message) {
return lazyTransform(
return transform(
rpc.publish(publishRequest(options(), topic, Collections.singletonList(message))),
new Function<PublishResponse, String>() {
@Override
Expand All @@ -308,7 +317,7 @@ public List<String> publish(String topic, Iterable<Message> messages) {

@Override
public Future<List<String>> publishAsync(String topic, Iterable<Message> messages) {
return lazyTransform(rpc.publish(publishRequest(options(), topic, messages)),
return transform(rpc.publish(publishRequest(options(), topic, messages)),
new Function<PublishResponse, List<String>>() {
@Override
public List<String> apply(PublishResponse publishResponse) {
Expand All @@ -324,7 +333,7 @@ public Subscription create(SubscriptionInfo subscription) {

@Override
public Future<Subscription> createAsync(SubscriptionInfo subscription) {
return lazyTransform(rpc.create(subscription.toPb(options().projectId())),
return transform(rpc.create(subscription.toPb(options().projectId())),
Subscription.fromPbFunction(this));
}

Expand All @@ -338,7 +347,7 @@ public Future<Subscription> getSubscriptionAsync(String subscription) {
GetSubscriptionRequest request = GetSubscriptionRequest.newBuilder()
.setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription))
.build();
return lazyTransform(rpc.get(request), Subscription.fromPbFunction(this));
return transform(rpc.get(request), Subscription.fromPbFunction(this));
}

@Override
Expand All @@ -353,7 +362,7 @@ public Future<Void> replacePushConfigAsync(String subscription, PushConfig pushC
.setPushConfig(pushConfig != null ? pushConfig.toPb()
: com.google.pubsub.v1.PushConfig.getDefaultInstance())
.build();
return lazyTransform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION);
return transform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION);
}

@Override
Expand All @@ -366,7 +375,7 @@ public Future<Boolean> deleteSubscriptionAsync(String subscription) {
DeleteSubscriptionRequest request = DeleteSubscriptionRequest.newBuilder()
.setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription))
.build();
return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
return transform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
}

private static ListSubscriptionsRequest listSubscriptionsRequest(PubSubOptions serviceOptions,
Expand All @@ -388,7 +397,7 @@ private static Future<AsyncPage<Subscription>> listSubscriptionsAsync(
final PubSubOptions serviceOptions, final Map<Option.OptionType, ?> options) {
final ListSubscriptionsRequest request = listSubscriptionsRequest(serviceOptions, options);
Future<ListSubscriptionsResponse> list = serviceOptions.rpc().list(request);
return lazyTransform(list, new Function<ListSubscriptionsResponse, AsyncPage<Subscription>>() {
return transform(list, new Function<ListSubscriptionsResponse, AsyncPage<Subscription>>() {
@Override
public AsyncPage<Subscription> apply(ListSubscriptionsResponse listSubscriptionsResponse) {
List<Subscription> subscriptions = listSubscriptionsResponse.getSubscriptionsList() == null
Expand Down Expand Up @@ -432,7 +441,7 @@ private static Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(final St
final ListTopicSubscriptionsRequest request =
listSubscriptionsRequest(topic, serviceOptions, options);
Future<ListTopicSubscriptionsResponse> list = serviceOptions.rpc().list(request);
return lazyTransform(list,
return transform(list,
new Function<ListTopicSubscriptionsResponse, AsyncPage<SubscriptionId>>() {
@Override
public AsyncPage<SubscriptionId> apply(
Expand Down Expand Up @@ -493,7 +502,7 @@ public void failure(Throwable error) {
// ignore
}
});
return lazyTransform(future, new Function<PullResponse, Iterator<ReceivedMessage>>() {
return transform(future, new Function<PullResponse, Iterator<ReceivedMessage>>() {
@Override
public Iterator<ReceivedMessage> apply(PullResponse response) {
return Iterators.transform(response.getReceivedMessagesList().iterator(),
Expand Down Expand Up @@ -540,7 +549,7 @@ public Future<Void> ackAsync(String subscription, Iterable<String> ackIds) {
.setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription))
.addAllAckIds(ackIds)
.build();
return lazyTransform(rpc.acknowledge(request), EMPTY_TO_VOID_FUNCTION);
return transform(rpc.acknowledge(request), EMPTY_TO_VOID_FUNCTION);
}

@Override
Expand Down Expand Up @@ -589,7 +598,7 @@ public Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, Ti
.setAckDeadlineSeconds((int) TimeUnit.SECONDS.convert(deadline, unit))
.addAllAckIds(ackIds)
.build();
return lazyTransform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION);
return transform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION);
}

static <T extends Option.OptionType> Map<Option.OptionType, ?> optionMap(Option... options) {
Expand Down
0