diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index b6b7318e30c7..dc468e59e77e 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -16,9 +16,11 @@ package com.google.cloud.pubsub.spi.v1; -import com.google.api.gax.core.FlowController; import com.google.api.gax.core.ApiClock; +import com.google.api.gax.core.FlowController; +import com.google.api.gax.core.FlowController.FlowControlException; import com.google.api.stats.Distribution; +import com.google.cloud.pubsub.spi.v1.MessageDispatcher.OutstandingMessagesBatch.OutstandingMessage; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.primitives.Ints; @@ -29,11 +31,13 @@ import com.google.pubsub.v1.ReceivedMessage; import java.util.ArrayList; import java.util.Collections; +import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.PriorityQueue; import java.util.Set; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -56,11 +60,15 @@ class MessageDispatcher { private static final int INITIAL_ACK_DEADLINE_EXTENSION_SECONDS = 2; @VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.millis(100); private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m + + private static final ScheduledExecutorService alarmsExecutor = + Executors.newScheduledThreadPool(2); private final ScheduledExecutorService executor; private final ApiClock clock; private final Duration ackExpirationPadding; + private final Duration maxAckExtensionPeriod; private final MessageReceiver receiver; private final AckProcessor ackProcessor; @@ -87,20 +95,27 @@ class MessageDispatcher { // it is not modified while inside the queue. // The hashcode and equals methods are explicitly not implemented to discourage // the use of this class as keys in maps or similar containers. - private static class ExtensionJob implements Comparable { + private class ExtensionJob implements Comparable { + Instant creation; Instant expiration; int nextExtensionSeconds; ArrayList ackHandlers; ExtensionJob( - Instant expiration, int initialAckDeadlineExtension, ArrayList ackHandlers) { + Instant creation, + Instant expiration, + int initialAckDeadlineExtension, + ArrayList ackHandlers) { + this.creation = creation; this.expiration = expiration; nextExtensionSeconds = initialAckDeadlineExtension; this.ackHandlers = ackHandlers; } void extendExpiration(Instant now) { - expiration = now.plus(Duration.standardSeconds(nextExtensionSeconds)); + Instant possibleExtension = now.plus(Duration.standardSeconds(nextExtensionSeconds)); + Instant maxExtension = creation.plus(maxAckExtensionPeriod); + expiration = possibleExtension.isBefore(maxExtension) ? possibleExtension : maxExtension; nextExtensionSeconds = Math.min(2 * nextExtensionSeconds, MAX_ACK_DEADLINE_EXTENSION_SECS); } @@ -176,6 +191,7 @@ public void onFailure(Throwable t) { setupPendingAcksAlarm(); flowController.release(1, outstandingBytes); messagesWaiter.incrementPendingMessages(-1); + processOutstandingBatches(); } @Override @@ -186,25 +202,23 @@ public void onSuccess(AckReply reply) { synchronized (pendingAcks) { pendingAcks.add(ackId); } - setupPendingAcksAlarm(); - flowController.release(1, outstandingBytes); // Record the latency rounded to the next closest integer. ackLatencyDistribution.record( Ints.saturatedCast( (long) Math.ceil((clock.millisTime() - receivedTime.getMillis()) / 1000D))); - messagesWaiter.incrementPendingMessages(-1); - return; + break; case NACK: synchronized (pendingNacks) { pendingNacks.add(ackId); } - setupPendingAcksAlarm(); - flowController.release(1, outstandingBytes); - messagesWaiter.incrementPendingMessages(-1); - return; + break; default: throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply)); } + setupPendingAcksAlarm(); + flowController.release(1, outstandingBytes); + messagesWaiter.incrementPendingMessages(-1); + processOutstandingBatches(); } } @@ -217,12 +231,14 @@ void sendAckOperations( MessageReceiver receiver, AckProcessor ackProcessor, Duration ackExpirationPadding, + Duration maxAckExtensionPeriod, Distribution ackLatencyDistribution, FlowController flowController, ScheduledExecutorService executor, ApiClock clock) { this.executor = executor; this.ackExpirationPadding = ackExpirationPadding; + this.maxAckExtensionPeriod = maxAckExtensionPeriod; this.receiver = receiver; this.ackProcessor = ackProcessor; this.flowController = flowController; @@ -259,28 +275,117 @@ public int getMessageDeadlineSeconds() { return messageDeadlineSeconds; } - public void processReceivedMessages(List responseMessages) { - int receivedMessagesCount = responseMessages.size(); - if (receivedMessagesCount == 0) { + static class OutstandingMessagesBatch { + static class OutstandingMessage { + private final com.google.pubsub.v1.ReceivedMessage receivedMessage; + private final AckHandler ackHandler; + + public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { + this.receivedMessage = receivedMessage; + this.ackHandler = ackHandler; + } + + public com.google.pubsub.v1.ReceivedMessage receivedMessage() { + return receivedMessage; + } + + public AckHandler ackHandler() { + return ackHandler; + } + } + + private final Deque messages; + private final Runnable doneCallback; + + public OutstandingMessagesBatch(Runnable doneCallback) { + this.messages = new LinkedList<>(); + this.doneCallback = doneCallback; + } + + public void addMessage( + com.google.pubsub.v1.ReceivedMessage receivedMessage, AckHandler ackHandler) { + this.messages.add(new OutstandingMessage(receivedMessage, ackHandler)); + } + + public Deque messages() { + return messages; + } + + public void done() { + doneCallback.run(); + } + } + + Deque outstandingMessageBatches = new LinkedList<>(); + + public void processReceivedMessages( + List messages, Runnable doneCallback) { + if (messages.size() == 0) { + doneCallback.run(); return; } - Instant now = new Instant(clock.millisTime()); - int totalByteCount = 0; - final ArrayList ackHandlers = new ArrayList<>(responseMessages.size()); - for (ReceivedMessage pubsubMessage : responseMessages) { - int messageSize = pubsubMessage.getMessage().getSerializedSize(); - totalByteCount += messageSize; - ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize)); + messagesWaiter.incrementPendingMessages(messages.size()); + + OutstandingMessagesBatch outstandingBatch = new OutstandingMessagesBatch(doneCallback); + final ArrayList ackHandlers = new ArrayList<>(messages.size()); + for (ReceivedMessage message : messages) { + AckHandler ackHandler = + new AckHandler(message.getAckId(), message.getMessage().getSerializedSize()); + ackHandlers.add(ackHandler); + outstandingBatch.addMessage(message, ackHandler); } - Instant expiration = now.plus(messageDeadlineSeconds * 1000); - logger.log( - Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now}); - - messagesWaiter.incrementPendingMessages(responseMessages.size()); - Iterator acksIterator = ackHandlers.iterator(); - for (ReceivedMessage userMessage : responseMessages) { - final PubsubMessage message = userMessage.getMessage(); - final AckHandler ackHandler = acksIterator.next(); + + Instant expiration = new Instant(clock.millisTime()).plus(messageDeadlineSeconds * 1000); + synchronized (outstandingAckHandlers) { + outstandingAckHandlers.add( + new ExtensionJob( + new Instant(clock.millisTime()), + expiration, + INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, + ackHandlers)); + } + setupNextAckDeadlineExtensionAlarm(expiration); + + synchronized (outstandingMessageBatches) { + outstandingMessageBatches.add(outstandingBatch); + } + processOutstandingBatches(); + } + + public void processOutstandingBatches() { + while (true) { + boolean batchDone = false; + Runnable batchCallback = null; + OutstandingMessage outstandingMessage; + synchronized (outstandingMessageBatches) { + OutstandingMessagesBatch nextBatch = outstandingMessageBatches.peek(); + if (nextBatch == null) { + return; + } + outstandingMessage = nextBatch.messages.peek(); + if (outstandingMessage == null) { + return; + } + try { + // This is a non-blocking flow controller. + flowController.reserve( + 1, outstandingMessage.receivedMessage().getMessage().getSerializedSize()); + } catch (FlowController.MaxOutstandingElementCountReachedException + | FlowController.MaxOutstandingRequestBytesReachedException flowControlException) { + return; + } catch (FlowControlException unexpectedException) { + throw new IllegalStateException("Flow control unexpected exception", unexpectedException); + } + nextBatch.messages.poll(); // We got a hold to the message already. + batchDone = nextBatch.messages.isEmpty(); + if (batchDone) { + outstandingMessageBatches.poll(); + batchCallback = nextBatch.doneCallback; + } + } + + final PubsubMessage message = outstandingMessage.receivedMessage().getMessage(); + final AckHandler ackHandler = outstandingMessage.ackHandler(); final SettableFuture response = SettableFuture.create(); final AckReplyConsumer consumer = new AckReplyConsumer() { @@ -301,18 +406,9 @@ public void run() { } } }); - } - - synchronized (outstandingAckHandlers) { - outstandingAckHandlers.add( - new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers)); - } - setupNextAckDeadlineExtensionAlarm(expiration); - - try { - flowController.reserve(receivedMessagesCount, totalByteCount); - } catch (FlowController.FlowControlException unexpectedException) { - throw new IllegalStateException("Flow control unexpected exception", unexpectedException); + if (batchDone) { + batchCallback.run(); + } } } @@ -321,7 +417,7 @@ private void setupPendingAcksAlarm() { try { if (pendingAcksAlarm == null) { pendingAcksAlarm = - executor.schedule( + alarmsExecutor.schedule( new Runnable() { @Override public void run() { @@ -380,6 +476,13 @@ public void run() { && outstandingAckHandlers.peek().expiration.compareTo(cutOverTime) <= 0) { ExtensionJob job = outstandingAckHandlers.poll(); + if (maxAckExtensionPeriod.getMillis() > 0 + && job.creation.plus(maxAckExtensionPeriod).compareTo(now) <= 0) { + // The job has expired, according to the maxAckExtensionPeriod, we are just going to + // drop it. + continue; + } + // If a message has already been acked, remove it, nothing to do. for (int i = 0; i < job.ackHandlers.size(); ) { if (job.ackHandlers.get(i).acked.get()) { @@ -443,7 +546,7 @@ private void setupNextAckDeadlineExtensionAlarm(Instant expiration) { nextAckDeadlineExtensionAlarmTime = possibleNextAlarmTime; ackDeadlineExtensionAlarm = - executor.schedule( + alarmsExecutor.schedule( new AckDeadlineAlarm(), nextAckDeadlineExtensionAlarmTime.getMillis() - clock.millisTime(), TimeUnit.MILLISECONDS); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java index 5f979490979a..94ef9bec5ee8 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java @@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; import org.joda.time.Duration; /** @@ -61,15 +62,18 @@ final class PollingSubscriberConnection extends AbstractApiService implements Ac private final ScheduledExecutorService executor; private final SubscriberFutureStub stub; private final MessageDispatcher messageDispatcher; + private final int maxDesiredPulledMessages; public PollingSubscriberConnection( String subscription, Credentials credentials, MessageReceiver receiver, Duration ackExpirationPadding, + Duration maxAckExtensionPeriod, Distribution ackLatencyDistribution, Channel channel, FlowController flowController, + @Nullable Integer maxDesiredPulledMessages, ScheduledExecutorService executor, ApiClock clock) { this.subscription = subscription; @@ -82,11 +86,14 @@ public PollingSubscriberConnection( receiver, this, ackExpirationPadding, + maxAckExtensionPeriod, ackLatencyDistribution, flowController, executor, clock); messageDispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS); + this.maxDesiredPulledMessages = + maxDesiredPulledMessages != null ? maxDesiredPulledMessages : DEFAULT_MAX_MESSAGES; } @Override @@ -115,7 +122,8 @@ public void onSuccess(Subscription result) { public void onFailure(Throwable cause) { notifyFailed(cause); } - }); + }, + executor); } @Override @@ -130,7 +138,7 @@ private void pullMessages(final Duration backoff) { .pull( PullRequest.newBuilder() .setSubscription(subscription) - .setMaxMessages(DEFAULT_MAX_MESSAGES) + .setMaxMessages(maxDesiredPulledMessages) .setReturnImmediately(true) .build()); @@ -139,7 +147,6 @@ private void pullMessages(final Duration backoff) { new FutureCallback() { @Override public void onSuccess(PullResponse pullResponse) { - messageDispatcher.processReceivedMessages(pullResponse.getReceivedMessagesList()); if (pullResponse.getReceivedMessagesCount() == 0) { // No messages in response, possibly caught up in backlog, we backoff to avoid // slamming the server. @@ -158,7 +165,14 @@ public void run() { TimeUnit.MILLISECONDS); return; } - pullMessages(INITIAL_BACKOFF); + messageDispatcher.processReceivedMessages( + pullResponse.getReceivedMessagesList(), + new Runnable() { + @Override + public void run() { + pullMessages(INITIAL_BACKOFF); + } + }); } @Override @@ -188,7 +202,8 @@ public void run() { notifyFailed(cause); } } - }); + }, + executor); } private boolean isAlive() { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java index a85fad6b0cce..903faaf03e2f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java @@ -70,6 +70,7 @@ public StreamingSubscriberConnection( Credentials credentials, MessageReceiver receiver, Duration ackExpirationPadding, + Duration maxAckExtensionPeriod, int streamAckDeadlineSeconds, Distribution ackLatencyDistribution, Channel channel, @@ -85,6 +86,7 @@ public StreamingSubscriberConnection( receiver, this, ackExpirationPadding, + maxAckExtensionPeriod, ackLatencyDistribution, flowController, executor, @@ -123,11 +125,17 @@ public void beforeStart(ClientCallStreamObserver requestOb @Override public void onNext(StreamingPullResponse response) { - messageDispatcher.processReceivedMessages(response.getReceivedMessagesList()); - // Only if not shutdown we will request one more batches of messages to be delivered. - if (isAlive()) { - requestObserver.request(1); - } + messageDispatcher.processReceivedMessages( + response.getReceivedMessagesList(), + new Runnable() { + @Override + public void run() { + // Only if not shutdown we will request one more batches of messages to be delivered. + if (isAlive()) { + requestObserver.request(1); + } + } + }); } @Override diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index b6c311730d82..e367548b6902 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -22,6 +22,7 @@ import com.google.api.gax.core.CurrentMillisClock; import com.google.api.gax.core.FlowControlSettings; import com.google.api.gax.core.FlowController; +import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.api.gax.grpc.ExecutorProvider; import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.api.stats.Distribution; @@ -92,6 +93,7 @@ public class Subscriber extends AbstractApiService { private final String cachedSubscriptionNameString; private final FlowControlSettings flowControlSettings; private final Duration ackExpirationPadding; + private final Duration maxAckExtensionPeriod; private final ScheduledExecutorService executor; private final Distribution ackLatencyDistribution = new Distribution(MAX_ACK_DEADLINE_SECONDS + 1); @@ -113,13 +115,20 @@ private Subscriber(Builder builder) throws IOException { subscriptionName = builder.subscriptionName; cachedSubscriptionNameString = subscriptionName.toString(); ackExpirationPadding = builder.ackExpirationPadding; + maxAckExtensionPeriod = builder.maxAckExtensionPeriod; streamAckDeadlineSeconds = Math.max( INITIAL_ACK_DEADLINE_SECONDS, Ints.saturatedCast(ackExpirationPadding.getStandardSeconds())); clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock(); - flowController = new FlowController(builder.flowControlSettings); + flowController = + new FlowController( + builder + .flowControlSettings + .toBuilder() + .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) + .build()); executor = builder.executorProvider.getExecutor(); if (builder.executorProvider.shouldAutoClose()) { @@ -241,6 +250,7 @@ private void startStreamingConnections() { credentials, receiver, ackExpirationPadding, + maxAckExtensionPeriod, streamAckDeadlineSeconds, ackLatencyDistribution, channelBuilder.build(), @@ -317,9 +327,11 @@ private void startPollingConnections() { credentials, receiver, ackExpirationPadding, + maxAckExtensionPeriod, ackLatencyDistribution, channelBuilder.build(), flowController, + flowControlSettings.getMaxOutstandingElementCount(), executor, clock)); } @@ -405,6 +417,7 @@ public void run() { public static final class Builder { private static final Duration MIN_ACK_EXPIRATION_PADDING = Duration.millis(100); private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.millis(500); + private static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.standardMinutes(60); static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = InstantiatingExecutorProvider.newBuilder() @@ -419,6 +432,7 @@ public static final class Builder { MessageReceiver receiver; Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING; + Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD; FlowControlSettings flowControlSettings = FlowControlSettings.getDefaultInstance(); @@ -479,6 +493,21 @@ public Builder setAckExpirationPadding(Duration ackExpirationPadding) { return this; } + /** + * Set the maximum period a message ack deadline will be extended. + * + *

It is recommended to set this value to a reasonable upper bound of the subscriber time to + * process any message. This maximum period avoids messages to be locked by a subscriber + * in cases when the {@link AckReply} is lost. + * + *

A zero duration effectively disables auto deadline extensions. + */ + public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) { + Preconditions.checkArgument(maxAckExtensionPeriod.getMillis() >= 0); + this.maxAckExtensionPeriod = maxAckExtensionPeriod; + return this; + } + /** Gives the ability to set a custom executor. */ public Builder setExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = Preconditions.checkNotNull(executorProvider); @@ -496,3 +525,4 @@ public Subscriber build() throws IOException { } } } + diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java index 19df0b265fb7..58b8c74d96e1 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java @@ -18,6 +18,7 @@ import static com.google.cloud.pubsub.spi.v1.MessageDispatcher.PENDING_ACKS_SEND_DELAY; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import com.google.api.gax.grpc.FixedExecutorProvider; import com.google.api.gax.grpc.InstantiatingExecutorProvider; @@ -276,8 +277,8 @@ public void testModifyAckDeadline() throws Exception { Subscriber subscriber = startSubscriber( getTestSubscriberBuilder(testReceiver) - .setAckExpirationPadding(Duration.standardSeconds(1))); - + .setAckExpirationPadding(Duration.standardSeconds(1)) + .setMaxAckExtensionPeriod(Duration.standardSeconds(13))); // Send messages to be acked List testAckIdsBatch = ImmutableList.of("A", "B", "C"); testReceiver.setExplicitAck(true); @@ -305,10 +306,17 @@ public ModifyAckDeadline apply(String ack) { new Function() { @Override public ModifyAckDeadline apply(String ack) { - return new ModifyAckDeadline(ack, 4); + return new ModifyAckDeadline(ack, 2); // It is expected that the deadline is renewed + // only two more seconds to not pass the max + // ack deadline ext. } }); + // No more modify ack deadline extension should be triggered at this point + fakeExecutor.advanceTime(Duration.standardSeconds(20)); + + assertTrue(fakeSubscriberServiceImpl.getModifyAckDeadlines().isEmpty()); + testReceiver.replyAllOutstandingMessage(); subscriber.stopAsync().awaitTerminated(); }