From c4c6174498f54e65ca78437c6477307d498579fc Mon Sep 17 00:00:00 2001 From: David Torres Date: Wed, 22 Feb 2017 15:39:50 -0500 Subject: [PATCH 1/8] Enabling client-side compression in the library, with an option to disable it. #1639 --- .../spi/v1/PollingSubscriberConnection.java | 7 ++++- .../google/cloud/pubsub/spi/v1/Publisher.java | 26 ++++++++++++++++--- .../spi/v1/StreamingSubscriberConnection.java | 13 +++++++--- .../cloud/pubsub/spi/v1/Subscriber.java | 17 ++++++++++++ 4 files changed, 55 insertions(+), 8 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java index 455bfbd37ba8..a253ab2fe259 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java @@ -74,13 +74,18 @@ public PollingSubscriberConnection( Distribution ackLatencyDistribution, Channel channel, FlowController flowController, + boolean compressionEnabled, ScheduledExecutorService executor, Clock clock) { this.subscription = subscription; this.executor = executor; - stub = + SubscriberFutureStub subscriberStub = SubscriberGrpc.newFutureStub(channel) .withCallCredentials(MoreCallCredentials.from(credentials)); + if (compressionEnabled) { + subscriberStub = subscriberStub.withCompression("gzip"); + } + stub = subscriberStub; messageDispatcher = new MessageDispatcher( receiver, diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index 7a002da09754..17f0179b679a 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -39,6 +39,7 @@ import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc; +import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import io.grpc.ManagedChannel; @@ -120,6 +121,8 @@ public class Publisher { private final FlowControlSettings flowControlSettings; private final boolean failOnFlowControlLimits; + + private final boolean compressionEnabled; private final Lock messagesBundleLock; private List messagesBundle; @@ -158,6 +161,8 @@ private Publisher(Builder builder) throws IOException { flowControlSettings = builder.flowControlSettings; failOnFlowControlLimits = builder.failOnFlowControlLimits; this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits); + + compressionEnabled = builder.compressionEnabled; messagesBundle = new LinkedList<>(); messagesBundleLock = new ReentrantLock(); @@ -382,10 +387,12 @@ private void publishOutstandingBundle(final OutstandingBundle outstandingBundle) * Math.pow(retrySettings.getRpcTimeoutMultiplier(), outstandingBundle.attempt - 1)); rpcTimeoutMs = Math.min(rpcTimeoutMs, retrySettings.getMaxRpcTimeout().getMillis()); - Futures.addCallback( - PublisherGrpc.newFutureStub(channels[currentChannel]) - .withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS) - .publish(publishRequest.build()), + PublisherFutureStub stub = PublisherGrpc.newFutureStub(channels[currentChannel]) + .withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS); + if (compressionEnabled) { + stub = stub.withCompression("gzip"); + } + Futures.addCallback(stub.publish(publishRequest.build()), new FutureCallback() { @Override public void onSuccess(PublishResponse result) { @@ -623,6 +630,8 @@ public long nextLong(long least, long bound) { ChannelProvider channelProvider = PublisherSettings.defaultChannelProviderBuilder().build(); ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER; + + boolean compressionEnabled = true; // client-side compression enabled by default private Builder(TopicName topic) { this.topicName = Preconditions.checkNotNull(topic); @@ -696,6 +705,15 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = Preconditions.checkNotNull(executorProvider); return this; } + + /** + * Gives the ability to disable client-side compression. + * Note compression is enabled by default. + */ + public Builder setCompressionEnabled(boolean enabled) { + this.compressionEnabled = enabled; + return this; + } public Publisher build() throws IOException { return new Publisher(this); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java index 6b4b0ef495a0..57352bc7c43b 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java @@ -60,6 +60,7 @@ final class StreamingSubscriberConnection extends AbstractService implements Ack private final Channel channel; private final Credentials credentials; + private final boolean compressionEnabled; private final String subscription; private final ScheduledExecutorService executor; @@ -75,6 +76,7 @@ public StreamingSubscriberConnection( Distribution ackLatencyDistribution, Channel channel, FlowController flowController, + boolean compressionEnabled, ScheduledExecutorService executor, Clock clock) { this.subscription = subscription; @@ -91,6 +93,7 @@ public StreamingSubscriberConnection( executor, clock); messageDispatcher.setMessageDeadlineSeconds(streamAckDeadlineSeconds); + this.compressionEnabled = compressionEnabled; } @Override @@ -148,12 +151,16 @@ private void initialize() { final SettableFuture errorFuture = SettableFuture.create(); final ClientResponseObserver responseObserver = new StreamingPullResponseObserver(errorFuture); + + CallOptions callOptions = + CallOptions.DEFAULT.withCallCredentials(MoreCallCredentials.from(credentials)); + if (compressionEnabled) { + callOptions = callOptions.withCompression("gzip"); + } final ClientCallStreamObserver requestObserver = (ClientCallStreamObserver) (ClientCalls.asyncBidiStreamingCall( - channel.newCall( - SubscriberGrpc.METHOD_STREAMING_PULL, - CallOptions.DEFAULT.withCallCredentials(MoreCallCredentials.from(credentials))), + channel.newCall(SubscriberGrpc.METHOD_STREAMING_PULL, callOptions), responseObserver)); logger.log( Level.INFO, diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index 946fe8aeef8f..56cf7eb69ea7 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -24,6 +24,7 @@ import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.Clock; +import com.google.cloud.pubsub.spi.v1.Publisher.Builder; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -279,6 +280,7 @@ private static class SubscriberImpl extends AbstractService { private final List pollingSubscriberConnections; private final Clock clock; private final List closeables = new ArrayList<>(); + private final boolean compressionEnabled; private ScheduledFuture ackDeadlineUpdater; private int streamAckDeadlineSeconds; @@ -328,6 +330,8 @@ public void close() throws IOException { numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE; streamingSubscriberConnections = new ArrayList(numChannels); pollingSubscriberConnections = new ArrayList(numChannels); + + compressionEnabled = builder.compressionEnabled; } @Override @@ -366,6 +370,7 @@ private void startStreamingConnections() { ackLatencyDistribution, channelBuilder.build(), flowController, + compressionEnabled, executor, clock)); } @@ -441,6 +446,7 @@ private void startPollingConnections() { ackLatencyDistribution, channelBuilder.build(), flowController, + compressionEnabled, executor, clock)); } @@ -546,6 +552,8 @@ public static final class Builder { Optional.absent(); Optional clock = Optional.absent(); + boolean compressionEnabled = true; // client-side compression enabled by default + Builder(SubscriptionName subscriptionName, MessageReceiver receiver) { this.subscriptionName = subscriptionName; this.receiver = receiver; @@ -609,6 +617,15 @@ Builder setClock(Clock clock) { this.clock = Optional.of(clock); return this; } + + /** + * Gives the ability to disable client-side compression. + * Note compression is enabled by default. + */ + public Builder setCompressionEnabled(boolean enabled) { + this.compressionEnabled = enabled; + return this; + } public Subscriber build() throws IOException { return new Subscriber(this); From d6a1043953ef5a801e7e9ccc65b436bda8a0e862 Mon Sep 17 00:00:00 2001 From: David Torres Date: Thu, 23 Feb 2017 10:13:38 -0500 Subject: [PATCH 2/8] Revert "Enabling client-side compression in the library, with an option to (#1645)". Compression is not fully supported in gRPC, can't have it in the library yet. This reverts commit a5999728f55f3657482a9b6fa671d432be56989f. --- .../spi/v1/PollingSubscriberConnection.java | 7 +---- .../google/cloud/pubsub/spi/v1/Publisher.java | 26 +++---------------- .../spi/v1/StreamingSubscriberConnection.java | 13 +++------- .../cloud/pubsub/spi/v1/Subscriber.java | 17 ------------ 4 files changed, 8 insertions(+), 55 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java index a253ab2fe259..455bfbd37ba8 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java @@ -74,18 +74,13 @@ public PollingSubscriberConnection( Distribution ackLatencyDistribution, Channel channel, FlowController flowController, - boolean compressionEnabled, ScheduledExecutorService executor, Clock clock) { this.subscription = subscription; this.executor = executor; - SubscriberFutureStub subscriberStub = + stub = SubscriberGrpc.newFutureStub(channel) .withCallCredentials(MoreCallCredentials.from(credentials)); - if (compressionEnabled) { - subscriberStub = subscriberStub.withCompression("gzip"); - } - stub = subscriberStub; messageDispatcher = new MessageDispatcher( receiver, diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index c37fd637e103..e72cae3bd7a9 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -39,7 +39,6 @@ import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc; -import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import io.grpc.ManagedChannel; @@ -90,8 +89,6 @@ public class Publisher { private final FlowControlSettings flowControlSettings; private final boolean failOnFlowControlLimits; - - private final boolean compressionEnabled; private final Lock messagesBundleLock; private List messagesBundle; @@ -130,8 +127,6 @@ private Publisher(Builder builder) throws IOException { flowControlSettings = builder.flowControlSettings; failOnFlowControlLimits = builder.failOnFlowControlLimits; this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits); - - compressionEnabled = builder.compressionEnabled; messagesBundle = new LinkedList<>(); messagesBundleLock = new ReentrantLock(); @@ -373,12 +368,10 @@ private void publishOutstandingBundle(final OutstandingBundle outstandingBundle) * Math.pow(retrySettings.getRpcTimeoutMultiplier(), outstandingBundle.attempt - 1)); rpcTimeoutMs = Math.min(rpcTimeoutMs, retrySettings.getMaxRpcTimeout().getMillis()); - PublisherFutureStub stub = PublisherGrpc.newFutureStub(channels[currentChannel]) - .withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS); - if (compressionEnabled) { - stub = stub.withCompression("gzip"); - } - Futures.addCallback(stub.publish(publishRequest.build()), + Futures.addCallback( + PublisherGrpc.newFutureStub(channels[currentChannel]) + .withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS) + .publish(publishRequest.build()), new FutureCallback() { @Override public void onSuccess(PublishResponse result) { @@ -633,8 +626,6 @@ public long nextLong(long least, long bound) { ChannelProvider channelProvider = PublisherSettings.defaultChannelProviderBuilder().build(); ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER; - - boolean compressionEnabled = true; // client-side compression enabled by default private Builder(TopicName topic) { this.topicName = Preconditions.checkNotNull(topic); @@ -708,15 +699,6 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = Preconditions.checkNotNull(executorProvider); return this; } - - /** - * Gives the ability to disable client-side compression. - * Note compression is enabled by default. - */ - public Builder setCompressionEnabled(boolean enabled) { - this.compressionEnabled = enabled; - return this; - } public Publisher build() throws IOException { return new Publisher(this); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java index 57352bc7c43b..6b4b0ef495a0 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java @@ -60,7 +60,6 @@ final class StreamingSubscriberConnection extends AbstractService implements Ack private final Channel channel; private final Credentials credentials; - private final boolean compressionEnabled; private final String subscription; private final ScheduledExecutorService executor; @@ -76,7 +75,6 @@ public StreamingSubscriberConnection( Distribution ackLatencyDistribution, Channel channel, FlowController flowController, - boolean compressionEnabled, ScheduledExecutorService executor, Clock clock) { this.subscription = subscription; @@ -93,7 +91,6 @@ public StreamingSubscriberConnection( executor, clock); messageDispatcher.setMessageDeadlineSeconds(streamAckDeadlineSeconds); - this.compressionEnabled = compressionEnabled; } @Override @@ -151,16 +148,12 @@ private void initialize() { final SettableFuture errorFuture = SettableFuture.create(); final ClientResponseObserver responseObserver = new StreamingPullResponseObserver(errorFuture); - - CallOptions callOptions = - CallOptions.DEFAULT.withCallCredentials(MoreCallCredentials.from(credentials)); - if (compressionEnabled) { - callOptions = callOptions.withCompression("gzip"); - } final ClientCallStreamObserver requestObserver = (ClientCallStreamObserver) (ClientCalls.asyncBidiStreamingCall( - channel.newCall(SubscriberGrpc.METHOD_STREAMING_PULL, callOptions), + channel.newCall( + SubscriberGrpc.METHOD_STREAMING_PULL, + CallOptions.DEFAULT.withCallCredentials(MoreCallCredentials.from(credentials))), responseObserver)); logger.log( Level.INFO, diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index 80898747316e..902226db408f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -24,7 +24,6 @@ import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.Clock; -import com.google.cloud.pubsub.spi.v1.Publisher.Builder; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -269,7 +268,6 @@ private static class SubscriberImpl extends AbstractService { private final List pollingSubscriberConnections; private final Clock clock; private final List closeables = new ArrayList<>(); - private final boolean compressionEnabled; private ScheduledFuture ackDeadlineUpdater; private int streamAckDeadlineSeconds; @@ -319,8 +317,6 @@ public void close() throws IOException { numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE; streamingSubscriberConnections = new ArrayList(numChannels); pollingSubscriberConnections = new ArrayList(numChannels); - - compressionEnabled = builder.compressionEnabled; } @Override @@ -359,7 +355,6 @@ private void startStreamingConnections() { ackLatencyDistribution, channelBuilder.build(), flowController, - compressionEnabled, executor, clock)); } @@ -435,7 +430,6 @@ private void startPollingConnections() { ackLatencyDistribution, channelBuilder.build(), flowController, - compressionEnabled, executor, clock)); } @@ -541,8 +535,6 @@ public static final class Builder { Optional.absent(); Optional clock = Optional.absent(); - boolean compressionEnabled = true; // client-side compression enabled by default - Builder(SubscriptionName subscriptionName, MessageReceiver receiver) { this.subscriptionName = subscriptionName; this.receiver = receiver; @@ -606,15 +598,6 @@ Builder setClock(Clock clock) { this.clock = Optional.of(clock); return this; } - - /** - * Gives the ability to disable client-side compression. - * Note compression is enabled by default. - */ - public Builder setCompressionEnabled(boolean enabled) { - this.compressionEnabled = enabled; - return this; - } public Subscriber build() throws IOException { return new Subscriber(this); From ec14abeebf1202e8d681aa37b309145a0b82ed7f Mon Sep 17 00:00:00 2001 From: David Torres Date: Wed, 15 Mar 2017 16:45:46 -0400 Subject: [PATCH 3/8] Changing the AckReplyConsumer interface to comply to just the Java 8 Consumer interface, it really is no useful to be able to set an exception as ack reply, since the result is the same as nack, if we ever require another result then we can just add one more value to the AckReply enum. Also adding a fail-safe catch so if the receiver ever throws an exception we will interpret that as a nack and keep going. --- .../cloud/pubsub/spi/v1/AckReplyConsumer.java | 4 +-- .../pubsub/spi/v1/MessageDispatcher.java | 14 ++++---- .../spi/v1/FakeSubscriberServiceImpl.java | 7 ++-- .../pubsub/spi/v1/SubscriberImplTest.java | 34 ++++++++++--------- 4 files changed, 32 insertions(+), 27 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java index 952a24f5ce0d..8e7ff3837ac5 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java @@ -19,9 +19,9 @@ /** * Accepts a reply, sending it to the service. * - *

Both the interface and its method is named after the Java 8's {@code BiConsumer} interface + *

Both the interface and its method is named after the Java 8's {@code Consumer} interface * to make migration to Java 8 and adopting its patterns easier. */ public interface AckReplyConsumer { - void accept(AckReply ackReply, Throwable t); + void accept(AckReply ackReply); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index 7067d8944cbd..b38ea9c10128 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -285,12 +285,8 @@ public void processReceivedMessages(List r final AckReplyConsumer consumer = new AckReplyConsumer() { @Override - public void accept(AckReply reply, Throwable t) { - if (reply != null) { - response.set(reply); - } else { - response.setException(t); - } + public void accept(AckReply reply) { + response.set(reply); } }; Futures.addCallback(response, ackHandler); @@ -298,7 +294,11 @@ public void accept(AckReply reply, Throwable t) { new Runnable() { @Override public void run() { - receiver.receiveMessage(message, consumer); + try { + receiver.receiveMessage(message, consumer); + } catch (Exception e) { + response.setException(e); + } } }); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeSubscriberServiceImpl.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeSubscriberServiceImpl.java index 55f472b3569d..2f0ed34423fe 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeSubscriberServiceImpl.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakeSubscriberServiceImpl.java @@ -49,7 +49,8 @@ class FakeSubscriberServiceImpl extends SubscriberImplBase { private final AtomicBoolean subscriptionInitialized = new AtomicBoolean(false); private String subscription = ""; - private final AtomicInteger messageAckDeadline = new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS); + private final AtomicInteger messageAckDeadline = + new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS); private final List openedStreams = new ArrayList<>(); private final List closedStreams = new ArrayList<>(); private final List acks = new ArrayList<>(); @@ -235,7 +236,9 @@ public void getSubscription( @Override public void pull(PullRequest request, StreamObserver responseObserver) { - receivedPullRequest.add(request); + synchronized (receivedPullRequest) { + receivedPullRequest.add(request); + } try { responseObserver.onNext(pullResponses.take()); responseObserver.onCompleted(); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java index 02186b2a7adc..61156cb93fc9 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java @@ -84,14 +84,14 @@ static class TestReceiver implements MessageReceiver { new LinkedBlockingQueue<>(); private AckReply ackReply = AckReply.ACK; private Optional messageCountLatch = Optional.absent(); - private Optional error = Optional.absent(); + private Optional error = Optional.absent(); private boolean explicitAckReplies; void setReply(AckReply ackReply) { this.ackReply = ackReply; } - void setErrorReply(Throwable error) { + void setErrorReply(RuntimeException error) { this.error = Optional.of(error); } @@ -111,18 +111,20 @@ void waitForExpectedMessages() throws InterruptedException { @Override public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { - if (explicitAckReplies) { - try { - outstandingMessageReplies.put(consumer); - } catch (InterruptedException e) { - throw new IllegalStateException(e); + try { + if (explicitAckReplies) { + try { + outstandingMessageReplies.put(consumer); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } else { + replyTo(consumer); + } + } finally { + if (messageCountLatch.isPresent()) { + messageCountLatch.get().countDown(); } - } else { - replyTo(consumer); - } - - if (messageCountLatch.isPresent()) { - messageCountLatch.get().countDown(); } } @@ -145,9 +147,9 @@ public void replyAllOutstandingMessage() { private void replyTo(AckReplyConsumer reply) { if (error.isPresent()) { - reply.accept(null, error.get()); + throw error.get(); } else { - reply.accept(ackReply, null); + reply.accept(ackReply); } } } @@ -207,7 +209,7 @@ public void testNackSingleMessage() throws Exception { @Test public void testReceiverError_NacksMessage() throws Exception { - testReceiver.setErrorReply(new Exception("Can't process message")); + testReceiver.setErrorReply(new RuntimeException("Can't process message")); Subscriber subscriber = startSubscriber(getTestSubscriberBuilder(testReceiver)); From 3e906e698e9b82f787c23307976b7153c56c2a6a Mon Sep 17 00:00:00 2001 From: davidtorres Date: Mon, 20 Mar 2017 14:54:27 -0400 Subject: [PATCH 4/8] Example fixes to comply with changes to the AckReplyConsumer interface. --- .../pubsub/snippets/CreateSubscriptionAndPullMessages.java | 2 +- .../examples/pubsub/snippets/MessageReceiverSnippets.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java index ef55f6a7ffee..531de1cc7fbf 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java @@ -46,7 +46,7 @@ public static void main(String... args) throws Exception { @Override public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { System.out.println("got message: " + message.getData().toStringUtf8()); - consumer.accept(AckReply.ACK, null); + consumer.accept(AckReply.ACK); } }; Subscriber subscriber = null; diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java index 160980558e92..d315c1e0709f 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java @@ -51,9 +51,9 @@ public MessageReceiver messageReceiver() { MessageReceiver receiver = new MessageReceiver() { public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { if (blockingQueue.offer(message)) { - consumer.accept(AckReply.ACK, null); + consumer.accept(AckReply.ACK); } else { - consumer.accept(AckReply.NACK, null); + consumer.accept(AckReply.NACK); } } }; From 32af932e014724badf2ecdcab4237c21adf51231 Mon Sep 17 00:00:00 2001 From: David Torres Date: Wed, 29 Mar 2017 17:53:09 -0400 Subject: [PATCH 5/8] Allowing for setting a maximum for message lease extensions. This prevents the library extending messages infinitevely if the user code has lost track of the ack consumer handle. --- .../pubsub/spi/v1/MessageDispatcher.java | 23 +++++++++++++++++-- .../spi/v1/PollingSubscriberConnection.java | 2 ++ .../spi/v1/StreamingSubscriberConnection.java | 2 ++ .../cloud/pubsub/spi/v1/Subscriber.java | 23 ++++++++++++++++++- .../pubsub/spi/v1/SubscriberImplTest.java | 10 ++++++-- 5 files changed, 55 insertions(+), 5 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index b38ea9c10128..98c86675ef18 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -61,6 +61,7 @@ class MessageDispatcher { private final Clock clock; private final Duration ackExpirationPadding; + private final Duration maxAckExtensionPeriod; private final MessageReceiver receiver; private final AckProcessor ackProcessor; @@ -88,12 +89,17 @@ class MessageDispatcher { // The hashcode and equals methods are explicitly not implemented to discourage // the use of this class as keys in maps or similar containers. private static class ExtensionJob implements Comparable { + Instant creation; Instant expiration; int nextExtensionSeconds; ArrayList ackHandlers; ExtensionJob( - Instant expiration, int initialAckDeadlineExtension, ArrayList ackHandlers) { + Instant creation, + Instant expiration, + int initialAckDeadlineExtension, + ArrayList ackHandlers) { + this.creation = creation; this.expiration = expiration; nextExtensionSeconds = initialAckDeadlineExtension; this.ackHandlers = ackHandlers; @@ -217,12 +223,14 @@ void sendAckOperations( MessageReceiver receiver, AckProcessor ackProcessor, Duration ackExpirationPadding, + Duration maxAckExtensionPeriod, Distribution ackLatencyDistribution, FlowController flowController, ScheduledExecutorService executor, Clock clock) { this.executor = executor; this.ackExpirationPadding = ackExpirationPadding; + this.maxAckExtensionPeriod = maxAckExtensionPeriod; this.receiver = receiver; this.ackProcessor = ackProcessor; this.flowController = flowController; @@ -305,7 +313,11 @@ public void run() { synchronized (outstandingAckHandlers) { outstandingAckHandlers.add( - new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers)); + new ExtensionJob( + new Instant(clock.millis()), + expiration, + INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, + ackHandlers)); } setupNextAckDeadlineExtensionAlarm(expiration); @@ -380,6 +392,13 @@ public void run() { && outstandingAckHandlers.peek().expiration.compareTo(cutOverTime) <= 0) { ExtensionJob job = outstandingAckHandlers.poll(); + if (maxAckExtensionPeriod.getMillis() > 0 + && job.creation.plus(maxAckExtensionPeriod).compareTo(now) <= 0) { + // The job has expired, according to the maxAckExtensionPeriod, we are just going to + // drop it. + continue; + } + // If a message has already been acked, remove it, nothing to do. for (int i = 0; i < job.ackHandlers.size(); ) { if (job.ackHandlers.get(i).acked.get()) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java index 33b216b73329..ff7a28c33734 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java @@ -71,6 +71,7 @@ public PollingSubscriberConnection( Credentials credentials, MessageReceiver receiver, Duration ackExpirationPadding, + Duration maxAckExtensionPeriod, Distribution ackLatencyDistribution, Channel channel, FlowController flowController, @@ -86,6 +87,7 @@ public PollingSubscriberConnection( receiver, this, ackExpirationPadding, + maxAckExtensionPeriod, ackLatencyDistribution, flowController, executor, diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java index 3927b0142844..821083f7754d 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java @@ -72,6 +72,7 @@ public StreamingSubscriberConnection( Credentials credentials, MessageReceiver receiver, Duration ackExpirationPadding, + Duration maxAckExtensionPeriod, int streamAckDeadlineSeconds, Distribution ackLatencyDistribution, Channel channel, @@ -87,6 +88,7 @@ public StreamingSubscriberConnection( receiver, this, ackExpirationPadding, + maxAckExtensionPeriod, ackLatencyDistribution, flowController, executor, diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index 4e20a1af4b23..8c1874dc87dd 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -256,6 +256,7 @@ private static class SubscriberImpl extends AbstractService { private final String cachedSubscriptionNameString; private final FlowControlSettings flowControlSettings; private final Duration ackExpirationPadding; + private final Duration maxAckExtensionPeriod; private final ScheduledExecutorService executor; private final Distribution ackLatencyDistribution = new Distribution(MAX_ACK_DEADLINE_SECONDS + 1); @@ -277,6 +278,7 @@ private SubscriberImpl(Builder builder) throws IOException { subscriptionName = builder.subscriptionName; cachedSubscriptionNameString = subscriptionName.toString(); ackExpirationPadding = builder.ackExpirationPadding; + maxAckExtensionPeriod = builder.maxAckExtensionPeriod; streamAckDeadlineSeconds = Math.max( INITIAL_ACK_DEADLINE_SECONDS, @@ -351,6 +353,7 @@ private void startStreamingConnections() { credentials, receiver, ackExpirationPadding, + maxAckExtensionPeriod, streamAckDeadlineSeconds, ackLatencyDistribution, channelBuilder.build(), @@ -427,6 +430,7 @@ private void startPollingConnections() { credentials, receiver, ackExpirationPadding, + maxAckExtensionPeriod, ackLatencyDistribution, channelBuilder.build(), flowController, @@ -516,7 +520,8 @@ public void run() { public static final class Builder { private static final Duration MIN_ACK_EXPIRATION_PADDING = Duration.millis(100); private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.millis(500); - + private static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.standardMinutes(60); + static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = InstantiatingExecutorProvider.newBuilder() .setExecutorThreadCount( @@ -530,6 +535,7 @@ public static final class Builder { MessageReceiver receiver; Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING; + Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD; FlowControlSettings flowControlSettings = FlowControlSettings.getDefaultInstance(); @@ -590,6 +596,21 @@ public Builder setAckExpirationPadding(Duration ackExpirationPadding) { return this; } + /** + * Set the maximum period a message ack deadline will be extended. + * + *

It is recommended to set this value to a reasonable upper bound of the subscriber time to + * process any message. This maximum period avoids messages to be locked by a subscriber + * in cases when the {@link AckReply} is lost. + * + *

A zero duration effectively disables auto deadline extensions. + */ + public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) { + Preconditions.checkArgument(maxAckExtensionPeriod.getMillis() >= 0); + this.maxAckExtensionPeriod = maxAckExtensionPeriod; + return this; + } + /** Gives the ability to set a custom executor. */ public Builder setExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = Preconditions.checkNotNull(executorProvider); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java index 61156cb93fc9..408362966b7b 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java @@ -18,6 +18,7 @@ import static com.google.cloud.pubsub.spi.v1.MessageDispatcher.PENDING_ACKS_SEND_DELAY; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import com.google.api.gax.grpc.FixedExecutorProvider; import com.google.api.gax.grpc.InstantiatingExecutorProvider; @@ -276,8 +277,8 @@ public void testModifyAckDeadline() throws Exception { Subscriber subscriber = startSubscriber( getTestSubscriberBuilder(testReceiver) - .setAckExpirationPadding(Duration.standardSeconds(1))); - + .setAckExpirationPadding(Duration.standardSeconds(1)) + .setMaxAckExtensionPeriod(Duration.standardSeconds(12))); // Send messages to be acked List testAckIdsBundle = ImmutableList.of("A", "B", "C"); testReceiver.setExplicitAck(true); @@ -309,6 +310,11 @@ public ModifyAckDeadline apply(String ack) { } }); + // No more modify ack deadline extension should be triggered at this point + fakeExecutor.advanceTime(Duration.standardSeconds(20)); + + assertTrue(fakeSubscriberServiceImpl.getModifyAckDeadlines().isEmpty()); + testReceiver.replyAllOutstandingMessage(); subscriber.stopAsync().awaitTerminated(); } From 7779e375d67d8a9dd0740599c24cbe269bb42ce0 Mon Sep 17 00:00:00 2001 From: David Torres Date: Thu, 30 Mar 2017 13:19:28 -0400 Subject: [PATCH 6/8] Setting so the max ack deadline duration is always respected even when the next extension surpasses it. --- .../com/google/cloud/pubsub/spi/v1/MessageDispatcher.java | 6 ++++-- .../com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index 98c86675ef18..c408e6204720 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -88,7 +88,7 @@ class MessageDispatcher { // it is not modified while inside the queue. // The hashcode and equals methods are explicitly not implemented to discourage // the use of this class as keys in maps or similar containers. - private static class ExtensionJob implements Comparable { + private class ExtensionJob implements Comparable { Instant creation; Instant expiration; int nextExtensionSeconds; @@ -106,7 +106,9 @@ private static class ExtensionJob implements Comparable { } void extendExpiration(Instant now) { - expiration = now.plus(Duration.standardSeconds(nextExtensionSeconds)); + Instant possibleExtension = now.plus(Duration.standardSeconds(nextExtensionSeconds)); + Instant maxExtension = creation.plus(maxAckExtensionPeriod); + expiration = possibleExtension.isBefore(maxExtension) ? possibleExtension : maxExtension; nextExtensionSeconds = Math.min(2 * nextExtensionSeconds, MAX_ACK_DEADLINE_EXTENSION_SECS); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java index 408362966b7b..9422c3f559cc 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java @@ -278,7 +278,7 @@ public void testModifyAckDeadline() throws Exception { startSubscriber( getTestSubscriberBuilder(testReceiver) .setAckExpirationPadding(Duration.standardSeconds(1)) - .setMaxAckExtensionPeriod(Duration.standardSeconds(12))); + .setMaxAckExtensionPeriod(Duration.standardSeconds(13))); // Send messages to be acked List testAckIdsBundle = ImmutableList.of("A", "B", "C"); testReceiver.setExplicitAck(true); @@ -306,7 +306,9 @@ public ModifyAckDeadline apply(String ack) { new Function() { @Override public ModifyAckDeadline apply(String ack) { - return new ModifyAckDeadline(ack, 4); + return new ModifyAckDeadline(ack, 2); // It is expected that the deadline is renewed + // only two more seconds to not pass the max + // ack deadline ext. } }); From 3d3a166e693678b352c7e8a7d96b685a03eb2eab Mon Sep 17 00:00:00 2001 From: David Torres Date: Thu, 6 Apr 2017 11:38:32 -0400 Subject: [PATCH 7/8] Fix a merge/compilation error with clock, clock.millis() -> clock.millisTime() --- .../com/google/cloud/pubsub/spi/v1/MessageDispatcher.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index 2bf541666498..59f2716e52f2 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -315,7 +315,11 @@ public void run() { synchronized (outstandingAckHandlers) { outstandingAckHandlers.add( - new ExtensionJob(new Instant(clock.millis()), expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers)); + new ExtensionJob( + new Instant(clock.millisTime()), + expiration, + INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, + ackHandlers)); } setupNextAckDeadlineExtensionAlarm(expiration); From b555c975c0488d204ecdd4a9c709bf166c84d746 Mon Sep 17 00:00:00 2001 From: David Torres Date: Fri, 7 Apr 2017 13:10:22 -0400 Subject: [PATCH 8/8] Decouple the processing of new received messages from the dispatching to user code - Separated the handling of batched received messages from the per-message dispatching of message to the user code. In order to accomplish this I had to keep the batch in memory while the processing of each message will draw from the in-memory batch until completely depleted. - Drawing flow controller permits on a per message basis (used to try to draw permits for the whole batch potentially deadlocking the whole subscriber), this addresses the deadlock condition raised here https://github.com/GoogleCloudPlatform/google-cloud-java/issues/1868 - No longer using blocking flow controller, instead pausing and resuming pulls/streamed-messages based on the flow controller feedback and when new permits become available. - A separate executor for alarms (2 threads in it has showed up to scale pretty well with many subscriber, given our ack operations are pretty lightweight) - Setting the maximum of messages to pull per request based on the number requested by the user in the flow controller (if any), this in a best effort addresses https://github.com/GoogleCloudPlatform/google-cloud-java/issues/1868 Fixes #1868, fixes #1865 and fixes #1855 --- .../pubsub/spi/v1/MessageDispatcher.java | 178 +++++++++++++----- .../spi/v1/PollingSubscriberConnection.java | 23 ++- .../spi/v1/StreamingSubscriberConnection.java | 16 +- .../cloud/pubsub/spi/v1/Subscriber.java | 10 +- 4 files changed, 168 insertions(+), 59 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index 59f2716e52f2..dc468e59e77e 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -16,9 +16,11 @@ package com.google.cloud.pubsub.spi.v1; -import com.google.api.gax.core.FlowController; import com.google.api.gax.core.ApiClock; +import com.google.api.gax.core.FlowController; +import com.google.api.gax.core.FlowController.FlowControlException; import com.google.api.stats.Distribution; +import com.google.cloud.pubsub.spi.v1.MessageDispatcher.OutstandingMessagesBatch.OutstandingMessage; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.primitives.Ints; @@ -29,11 +31,13 @@ import com.google.pubsub.v1.ReceivedMessage; import java.util.ArrayList; import java.util.Collections; +import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.PriorityQueue; import java.util.Set; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -56,6 +60,9 @@ class MessageDispatcher { private static final int INITIAL_ACK_DEADLINE_EXTENSION_SECONDS = 2; @VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.millis(100); private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m + + private static final ScheduledExecutorService alarmsExecutor = + Executors.newScheduledThreadPool(2); private final ScheduledExecutorService executor; private final ApiClock clock; @@ -184,6 +191,7 @@ public void onFailure(Throwable t) { setupPendingAcksAlarm(); flowController.release(1, outstandingBytes); messagesWaiter.incrementPendingMessages(-1); + processOutstandingBatches(); } @Override @@ -194,25 +202,23 @@ public void onSuccess(AckReply reply) { synchronized (pendingAcks) { pendingAcks.add(ackId); } - setupPendingAcksAlarm(); - flowController.release(1, outstandingBytes); // Record the latency rounded to the next closest integer. ackLatencyDistribution.record( Ints.saturatedCast( (long) Math.ceil((clock.millisTime() - receivedTime.getMillis()) / 1000D))); - messagesWaiter.incrementPendingMessages(-1); - return; + break; case NACK: synchronized (pendingNacks) { pendingNacks.add(ackId); } - setupPendingAcksAlarm(); - flowController.release(1, outstandingBytes); - messagesWaiter.incrementPendingMessages(-1); - return; + break; default: throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply)); } + setupPendingAcksAlarm(); + flowController.release(1, outstandingBytes); + messagesWaiter.incrementPendingMessages(-1); + processOutstandingBatches(); } } @@ -269,28 +275,117 @@ public int getMessageDeadlineSeconds() { return messageDeadlineSeconds; } - public void processReceivedMessages(List responseMessages) { - int receivedMessagesCount = responseMessages.size(); - if (receivedMessagesCount == 0) { + static class OutstandingMessagesBatch { + static class OutstandingMessage { + private final com.google.pubsub.v1.ReceivedMessage receivedMessage; + private final AckHandler ackHandler; + + public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { + this.receivedMessage = receivedMessage; + this.ackHandler = ackHandler; + } + + public com.google.pubsub.v1.ReceivedMessage receivedMessage() { + return receivedMessage; + } + + public AckHandler ackHandler() { + return ackHandler; + } + } + + private final Deque messages; + private final Runnable doneCallback; + + public OutstandingMessagesBatch(Runnable doneCallback) { + this.messages = new LinkedList<>(); + this.doneCallback = doneCallback; + } + + public void addMessage( + com.google.pubsub.v1.ReceivedMessage receivedMessage, AckHandler ackHandler) { + this.messages.add(new OutstandingMessage(receivedMessage, ackHandler)); + } + + public Deque messages() { + return messages; + } + + public void done() { + doneCallback.run(); + } + } + + Deque outstandingMessageBatches = new LinkedList<>(); + + public void processReceivedMessages( + List messages, Runnable doneCallback) { + if (messages.size() == 0) { + doneCallback.run(); return; } - Instant now = new Instant(clock.millisTime()); - int totalByteCount = 0; - final ArrayList ackHandlers = new ArrayList<>(responseMessages.size()); - for (ReceivedMessage pubsubMessage : responseMessages) { - int messageSize = pubsubMessage.getMessage().getSerializedSize(); - totalByteCount += messageSize; - ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize)); + messagesWaiter.incrementPendingMessages(messages.size()); + + OutstandingMessagesBatch outstandingBatch = new OutstandingMessagesBatch(doneCallback); + final ArrayList ackHandlers = new ArrayList<>(messages.size()); + for (ReceivedMessage message : messages) { + AckHandler ackHandler = + new AckHandler(message.getAckId(), message.getMessage().getSerializedSize()); + ackHandlers.add(ackHandler); + outstandingBatch.addMessage(message, ackHandler); } - Instant expiration = now.plus(messageDeadlineSeconds * 1000); - logger.log( - Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now}); - - messagesWaiter.incrementPendingMessages(responseMessages.size()); - Iterator acksIterator = ackHandlers.iterator(); - for (ReceivedMessage userMessage : responseMessages) { - final PubsubMessage message = userMessage.getMessage(); - final AckHandler ackHandler = acksIterator.next(); + + Instant expiration = new Instant(clock.millisTime()).plus(messageDeadlineSeconds * 1000); + synchronized (outstandingAckHandlers) { + outstandingAckHandlers.add( + new ExtensionJob( + new Instant(clock.millisTime()), + expiration, + INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, + ackHandlers)); + } + setupNextAckDeadlineExtensionAlarm(expiration); + + synchronized (outstandingMessageBatches) { + outstandingMessageBatches.add(outstandingBatch); + } + processOutstandingBatches(); + } + + public void processOutstandingBatches() { + while (true) { + boolean batchDone = false; + Runnable batchCallback = null; + OutstandingMessage outstandingMessage; + synchronized (outstandingMessageBatches) { + OutstandingMessagesBatch nextBatch = outstandingMessageBatches.peek(); + if (nextBatch == null) { + return; + } + outstandingMessage = nextBatch.messages.peek(); + if (outstandingMessage == null) { + return; + } + try { + // This is a non-blocking flow controller. + flowController.reserve( + 1, outstandingMessage.receivedMessage().getMessage().getSerializedSize()); + } catch (FlowController.MaxOutstandingElementCountReachedException + | FlowController.MaxOutstandingRequestBytesReachedException flowControlException) { + return; + } catch (FlowControlException unexpectedException) { + throw new IllegalStateException("Flow control unexpected exception", unexpectedException); + } + nextBatch.messages.poll(); // We got a hold to the message already. + batchDone = nextBatch.messages.isEmpty(); + if (batchDone) { + outstandingMessageBatches.poll(); + batchCallback = nextBatch.doneCallback; + } + } + + final PubsubMessage message = outstandingMessage.receivedMessage().getMessage(); + final AckHandler ackHandler = outstandingMessage.ackHandler(); final SettableFuture response = SettableFuture.create(); final AckReplyConsumer consumer = new AckReplyConsumer() { @@ -311,22 +406,9 @@ public void run() { } } }); - } - - synchronized (outstandingAckHandlers) { - outstandingAckHandlers.add( - new ExtensionJob( - new Instant(clock.millisTime()), - expiration, - INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, - ackHandlers)); - } - setupNextAckDeadlineExtensionAlarm(expiration); - - try { - flowController.reserve(receivedMessagesCount, totalByteCount); - } catch (FlowController.FlowControlException unexpectedException) { - throw new IllegalStateException("Flow control unexpected exception", unexpectedException); + if (batchDone) { + batchCallback.run(); + } } } @@ -335,7 +417,7 @@ private void setupPendingAcksAlarm() { try { if (pendingAcksAlarm == null) { pendingAcksAlarm = - executor.schedule( + alarmsExecutor.schedule( new Runnable() { @Override public void run() { @@ -400,7 +482,7 @@ public void run() { // drop it. continue; } - + // If a message has already been acked, remove it, nothing to do. for (int i = 0; i < job.ackHandlers.size(); ) { if (job.ackHandlers.get(i).acked.get()) { @@ -464,7 +546,7 @@ private void setupNextAckDeadlineExtensionAlarm(Instant expiration) { nextAckDeadlineExtensionAlarmTime = possibleNextAlarmTime; ackDeadlineExtensionAlarm = - executor.schedule( + alarmsExecutor.schedule( new AckDeadlineAlarm(), nextAckDeadlineExtensionAlarmTime.getMillis() - clock.millisTime(), TimeUnit.MILLISECONDS); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java index ce960b683877..94ef9bec5ee8 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java @@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; import org.joda.time.Duration; /** @@ -61,6 +62,7 @@ final class PollingSubscriberConnection extends AbstractApiService implements Ac private final ScheduledExecutorService executor; private final SubscriberFutureStub stub; private final MessageDispatcher messageDispatcher; + private final int maxDesiredPulledMessages; public PollingSubscriberConnection( String subscription, @@ -71,6 +73,7 @@ public PollingSubscriberConnection( Distribution ackLatencyDistribution, Channel channel, FlowController flowController, + @Nullable Integer maxDesiredPulledMessages, ScheduledExecutorService executor, ApiClock clock) { this.subscription = subscription; @@ -89,6 +92,8 @@ public PollingSubscriberConnection( executor, clock); messageDispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS); + this.maxDesiredPulledMessages = + maxDesiredPulledMessages != null ? maxDesiredPulledMessages : DEFAULT_MAX_MESSAGES; } @Override @@ -117,7 +122,8 @@ public void onSuccess(Subscription result) { public void onFailure(Throwable cause) { notifyFailed(cause); } - }); + }, + executor); } @Override @@ -132,7 +138,7 @@ private void pullMessages(final Duration backoff) { .pull( PullRequest.newBuilder() .setSubscription(subscription) - .setMaxMessages(DEFAULT_MAX_MESSAGES) + .setMaxMessages(maxDesiredPulledMessages) .setReturnImmediately(true) .build()); @@ -141,7 +147,6 @@ private void pullMessages(final Duration backoff) { new FutureCallback() { @Override public void onSuccess(PullResponse pullResponse) { - messageDispatcher.processReceivedMessages(pullResponse.getReceivedMessagesList()); if (pullResponse.getReceivedMessagesCount() == 0) { // No messages in response, possibly caught up in backlog, we backoff to avoid // slamming the server. @@ -160,7 +165,14 @@ public void run() { TimeUnit.MILLISECONDS); return; } - pullMessages(INITIAL_BACKOFF); + messageDispatcher.processReceivedMessages( + pullResponse.getReceivedMessagesList(), + new Runnable() { + @Override + public void run() { + pullMessages(INITIAL_BACKOFF); + } + }); } @Override @@ -190,7 +202,8 @@ public void run() { notifyFailed(cause); } } - }); + }, + executor); } private boolean isAlive() { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java index 4be3745b6251..903faaf03e2f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java @@ -125,11 +125,17 @@ public void beforeStart(ClientCallStreamObserver requestOb @Override public void onNext(StreamingPullResponse response) { - messageDispatcher.processReceivedMessages(response.getReceivedMessagesList()); - // Only if not shutdown we will request one more batches of messages to be delivered. - if (isAlive()) { - requestObserver.request(1); - } + messageDispatcher.processReceivedMessages( + response.getReceivedMessagesList(), + new Runnable() { + @Override + public void run() { + // Only if not shutdown we will request one more batches of messages to be delivered. + if (isAlive()) { + requestObserver.request(1); + } + } + }); } @Override diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index 4d37999d8c82..e367548b6902 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -22,6 +22,7 @@ import com.google.api.gax.core.CurrentMillisClock; import com.google.api.gax.core.FlowControlSettings; import com.google.api.gax.core.FlowController; +import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.api.gax.grpc.ExecutorProvider; import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.api.stats.Distribution; @@ -121,7 +122,13 @@ private Subscriber(Builder builder) throws IOException { Ints.saturatedCast(ackExpirationPadding.getStandardSeconds())); clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock(); - flowController = new FlowController(builder.flowControlSettings); + flowController = + new FlowController( + builder + .flowControlSettings + .toBuilder() + .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) + .build()); executor = builder.executorProvider.getExecutor(); if (builder.executorProvider.shouldAutoClose()) { @@ -324,6 +331,7 @@ private void startPollingConnections() { ackLatencyDistribution, channelBuilder.build(), flowController, + flowControlSettings.getMaxOutstandingElementCount(), executor, clock)); }