diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index 83fca0296fdf..799770bad761 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -28,12 +28,11 @@ import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -68,8 +67,7 @@ class MessageDispatcher { private final FlowController flowController; private final MessagesWaiter messagesWaiter; - // Map of outstanding messages (value) ordered by expiration time (key) in ascending order. - private final Map> outstandingAckHandlers; + private final PriorityQueue outstandingAckHandlers; private final Set pendingAcks; private final Set pendingNacks; @@ -82,40 +80,43 @@ class MessageDispatcher { // To keep track of number of seconds the receiver takes to process messages. private final Distribution ackLatencyDistribution; - private static class ExpirationInfo implements Comparable { - private final Clock clock; + // ExtensionJob represents a group of {@code AckHandler}s that shares the same expiration. + // + // It is Comparable so that it may be put in a PriorityQueue. + // For efficiency, it is also mutable, so great care should be taken to make sure + // it is not modified while inside the queue. + // The hashcode and equals methods are explicitly not implemented to discourage + // the use of this class as keys in maps or similar containers. + private static class ExtensionJob implements Comparable { Instant expiration; int nextExtensionSeconds; + ArrayList ackHandlers; - ExpirationInfo(Clock clock, Instant expiration, int initialAckDeadlineExtension) { - this.clock = clock; + ExtensionJob( + Instant expiration, int initialAckDeadlineExtension, ArrayList ackHandlers) { this.expiration = expiration; nextExtensionSeconds = initialAckDeadlineExtension; + this.ackHandlers = ackHandlers; } - void extendExpiration() { - expiration = new Instant(clock.millis()).plus(Duration.standardSeconds(nextExtensionSeconds)); + void extendExpiration(Instant now) { + expiration = now.plus(Duration.standardSeconds(nextExtensionSeconds)); nextExtensionSeconds = Math.min(2 * nextExtensionSeconds, MAX_ACK_DEADLINE_EXTENSION_SECS); } @Override - public int hashCode() { - return expiration.hashCode(); + public int compareTo(ExtensionJob other) { + return expiration.compareTo(other.expiration); } - @Override - public boolean equals(Object obj) { - if (!(obj instanceof ExpirationInfo)) { - return false; + public String toString() { + ArrayList ackIds = new ArrayList<>(); + for (AckHandler ah : ackHandlers) { + ackIds.add(ah.ackId); } - - ExpirationInfo other = (ExpirationInfo) obj; - return expiration.equals(other.expiration); - } - - @Override - public int compareTo(ExpirationInfo other) { - return expiration.compareTo(other.expiration); + return String.format( + "ExtensionJob {expiration: %s, nextExtensionSeconds: %d, ackIds: %s}", + expiration, nextExtensionSeconds, ackIds); } } @@ -137,6 +138,12 @@ static class PendingModifyAckDeadline { public void addAckId(String ackId) { ackIds.add(ackId); } + + public String toString() { + return String.format( + "PendingModifyAckDeadline{extension: %d sec, ackIds: %s}", + deadlineExtensionSeconds, ackIds); + } } /** @@ -217,7 +224,7 @@ void sendAckOperations( this.receiver = receiver; this.ackProcessor = ackProcessor; this.flowController = flowController; - outstandingAckHandlers = new HashMap<>(); + outstandingAckHandlers = new PriorityQueue<>(); pendingAcks = new HashSet<>(); pendingNacks = new HashSet<>(); // 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS @@ -257,18 +264,13 @@ public void processReceivedMessages(List r } Instant now = new Instant(clock.millis()); int totalByteCount = 0; - final List ackHandlers = new ArrayList<>(responseMessages.size()); + final ArrayList ackHandlers = new ArrayList<>(responseMessages.size()); for (ReceivedMessage pubsubMessage : responseMessages) { int messageSize = pubsubMessage.getMessage().getSerializedSize(); totalByteCount += messageSize; ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize)); } - ExpirationInfo expiration = - new ExpirationInfo( - clock, now.plus(messageDeadlineSeconds * 1000), INITIAL_ACK_DEADLINE_EXTENSION_SECONDS); - synchronized (outstandingAckHandlers) { - addOutstadingAckHandlers(expiration, ackHandlers); - } + Instant expiration = now.plus(messageDeadlineSeconds * 1000); logger.debug("Received {} messages at {}", responseMessages.size(), now); setupNextAckDeadlineExtensionAlarm(expiration); @@ -285,6 +287,17 @@ public void run() { } }); } + + // There is a race condition. setupNextAckDeadlineExtensionAlarm might set + // an alarm that fires before this block can run. + // The fix is to move setup below this block, but doing so aggravates another + // race condition. + // TODO(pongad): Fix both races. + synchronized (outstandingAckHandlers) { + outstandingAckHandlers.add( + new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers)); + } + try { flowController.reserve(receivedMessagesCount, totalByteCount); } catch (FlowController.FlowControlException unexpectedException) { @@ -292,14 +305,6 @@ public void run() { } } - private void addOutstadingAckHandlers( - ExpirationInfo expiration, final List ackHandlers) { - if (!outstandingAckHandlers.containsKey(expiration)) { - outstandingAckHandlers.put(expiration, new ArrayList(ackHandlers.size())); - } - outstandingAckHandlers.get(expiration).addAll(ackHandlers); - } - private void setupPendingAcksAlarm() { alarmsLock.lock(); try { @@ -354,41 +359,49 @@ public void run() { now, cutOverTime, ackExpirationPadding); - ExpirationInfo nextScheduleExpiration = null; + Instant nextScheduleExpiration = null; List modifyAckDeadlinesToSend = new ArrayList<>(); + // Holding area for jobs we'll put back into the queue + // so we don't process the same job twice. + List renewJobs = new ArrayList<>(); + synchronized (outstandingAckHandlers) { - for (ExpirationInfo messageExpiration : outstandingAckHandlers.keySet()) { - if (messageExpiration.expiration.compareTo(cutOverTime) <= 0) { - Collection expiringAcks = outstandingAckHandlers.get(messageExpiration); - outstandingAckHandlers.remove(messageExpiration); - List renewedAckHandlers = new ArrayList<>(expiringAcks.size()); - messageExpiration.extendExpiration(); - int extensionSeconds = - Ints.saturatedCast( - new Interval(now, messageExpiration.expiration) - .toDuration() - .getStandardSeconds()); - PendingModifyAckDeadline pendingModAckDeadline = - new PendingModifyAckDeadline(extensionSeconds); - for (AckHandler ackHandler : expiringAcks) { - if (ackHandler.acked.get()) { - continue; - } - pendingModAckDeadline.addAckId(ackHandler.ackId); - renewedAckHandlers.add(ackHandler); - } - modifyAckDeadlinesToSend.add(pendingModAckDeadline); - if (!renewedAckHandlers.isEmpty()) { - addOutstadingAckHandlers(messageExpiration, renewedAckHandlers); + while (!outstandingAckHandlers.isEmpty() + && outstandingAckHandlers.peek().expiration.compareTo(cutOverTime) <= 0) { + ExtensionJob job = outstandingAckHandlers.poll(); + + // If a message has already been acked, remove it, nothing to do. + for (int i = 0; i < job.ackHandlers.size(); ) { + if (job.ackHandlers.get(i).acked.get()) { + Collections.swap(job.ackHandlers, i, job.ackHandlers.size() - 1); + job.ackHandlers.remove(job.ackHandlers.size() - 1); } else { - outstandingAckHandlers.remove(messageExpiration); + i++; } } - if (nextScheduleExpiration == null - || nextScheduleExpiration.expiration.isAfter(messageExpiration.expiration)) { - nextScheduleExpiration = messageExpiration; + + if (job.ackHandlers.isEmpty()) { + continue; + } + + job.extendExpiration(now); + int extensionSeconds = + Ints.saturatedCast( + new Interval(now, job.expiration).toDuration().getStandardSeconds()); + PendingModifyAckDeadline pendingModAckDeadline = + new PendingModifyAckDeadline(extensionSeconds); + for (AckHandler ackHandler : job.ackHandlers) { + pendingModAckDeadline.addAckId(ackHandler.ackId); } + modifyAckDeadlinesToSend.add(pendingModAckDeadline); + renewJobs.add(job); + } + for (ExtensionJob job : renewJobs) { + outstandingAckHandlers.add(job); + } + if (!outstandingAckHandlers.isEmpty()) { + nextScheduleExpiration = outstandingAckHandlers.peek().expiration; } } @@ -404,8 +417,8 @@ public void run() { } } - private void setupNextAckDeadlineExtensionAlarm(ExpirationInfo messageExpiration) { - Instant possibleNextAlarmTime = messageExpiration.expiration.minus(ackExpirationPadding); + private void setupNextAckDeadlineExtensionAlarm(Instant expiration) { + Instant possibleNextAlarmTime = expiration.minus(ackExpirationPadding); alarmsLock.lock(); try { if (nextAckDeadlineExtensionAlarmTime.isAfter(possibleNextAlarmTime)) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/testing/LocalPubSubHelper.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/testing/LocalPubSubHelper.java index 347bee595af2..b99eb2ab2885 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/testing/LocalPubSubHelper.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/testing/LocalPubSubHelper.java @@ -135,9 +135,7 @@ public void reset() throws IOException { */ @Override public void stop(Duration timeout) throws IOException, InterruptedException, TimeoutException { - System.err.println("sending"); sendPostRequest("/shutdown"); - System.err.println("sent"); waitForProcess(timeout); } }