diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 509d71ae5aeb..a851797500a0 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -365,38 +365,32 @@ public void processReceivedMessages(List messages, Runnable don } messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size()); - synchronized (outstandingMessageBatches) { - outstandingMessageBatches.add(outstandingBatch); - } + outstandingMessageBatches.add(outstandingBatch); processOutstandingBatches(); } private void processOutstandingBatches() { - synchronized (outstandingMessageBatches) { - for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll(); - nextBatch != null; - nextBatch = outstandingMessageBatches.poll()) { - for (OutstandingMessage nextMessage = nextBatch.messages.poll(); - nextMessage != null; - nextMessage = nextBatch.messages.poll()) { - try { - // This is a non-blocking flow controller. - flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize()); - } catch (FlowController.MaxOutstandingElementCountReachedException - | FlowController.MaxOutstandingRequestBytesReachedException flowControlException) { - // Unwind previous changes in the batches outstanding. - nextBatch.messages.addFirst(nextMessage); - outstandingMessageBatches.addFirst(nextBatch); - return; - } catch (FlowControlException unexpectedException) { - throw new IllegalStateException( - "Flow control unexpected exception", unexpectedException); - } - processOutstandingMessage( - nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler); + for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll(); + nextBatch != null; + nextBatch = outstandingMessageBatches.poll()) { + for (OutstandingMessage nextMessage = nextBatch.messages.poll(); + nextMessage != null; + nextMessage = nextBatch.messages.poll()) { + try { + // This is a non-blocking flow controller. + flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize()); + } catch (FlowController.MaxOutstandingElementCountReachedException + | FlowController.MaxOutstandingRequestBytesReachedException flowControlException) { + // Unwind previous changes in the batches outstanding. + nextBatch.messages.addFirst(nextMessage); + outstandingMessageBatches.addFirst(nextBatch); + return; + } catch (FlowControlException unexpectedException) { + throw new IllegalStateException("Flow control unexpected exception", unexpectedException); } - nextBatch.doneCallback.run(); + processOutstandingMessage(nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler); } + nextBatch.doneCallback.run(); } }