From 59ef9fbd568baee15e7b46990148e9ef722e8b5c Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Mon, 4 Mar 2019 11:23:33 -0500 Subject: [PATCH 1/2] Remove global synchronization from MessageDispatcher. Now that this uses a LinkedBlockingDeque for batches, this is no longer necessary. --- .../cloud/pubsub/v1/MessageDispatcher.java | 48 +++++++++---------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 509d71ae5aeb..7e5032077faa 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -365,38 +365,34 @@ public void processReceivedMessages(List messages, Runnable don } messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size()); - synchronized (outstandingMessageBatches) { - outstandingMessageBatches.add(outstandingBatch); - } + outstandingMessageBatches.add(outstandingBatch); processOutstandingBatches(); } private void processOutstandingBatches() { - synchronized (outstandingMessageBatches) { - for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll(); - nextBatch != null; - nextBatch = outstandingMessageBatches.poll()) { - for (OutstandingMessage nextMessage = nextBatch.messages.poll(); - nextMessage != null; - nextMessage = nextBatch.messages.poll()) { - try { - // This is a non-blocking flow controller. - flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize()); - } catch (FlowController.MaxOutstandingElementCountReachedException - | FlowController.MaxOutstandingRequestBytesReachedException flowControlException) { - // Unwind previous changes in the batches outstanding. - nextBatch.messages.addFirst(nextMessage); - outstandingMessageBatches.addFirst(nextBatch); - return; - } catch (FlowControlException unexpectedException) { - throw new IllegalStateException( - "Flow control unexpected exception", unexpectedException); - } - processOutstandingMessage( - nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler); + for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll(); + nextBatch != null; + nextBatch = outstandingMessageBatches.poll()) { + for (OutstandingMessage nextMessage = nextBatch.messages.poll(); + nextMessage != null; + nextMessage = nextBatch.messages.poll()) { + try { + // This is a non-blocking flow controller. + flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize()); + } catch (FlowController.MaxOutstandingElementCountReachedException + | FlowController.MaxOutstandingRequestBytesReachedException flowControlException) { + // Unwind previous changes in the batches outstanding. + nextBatch.messages.addFirst(nextMessage); + outstandingMessageBatches.addFirst(nextBatch); + return; + } catch (FlowControlException unexpectedException) { + throw new IllegalStateException( + "Flow control unexpected exception", unexpectedException); } - nextBatch.doneCallback.run(); + processOutstandingMessage( + nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler); } + nextBatch.doneCallback.run(); } } From 74e4b4efd6d83f377626793d3a1cbd6fd69eaf04 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Mon, 4 Mar 2019 14:16:54 -0500 Subject: [PATCH 2/2] Run code format. --- .../google/cloud/pubsub/v1/MessageDispatcher.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 7e5032077faa..a851797500a0 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -371,11 +371,11 @@ public void processReceivedMessages(List messages, Runnable don private void processOutstandingBatches() { for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll(); - nextBatch != null; - nextBatch = outstandingMessageBatches.poll()) { + nextBatch != null; + nextBatch = outstandingMessageBatches.poll()) { for (OutstandingMessage nextMessage = nextBatch.messages.poll(); - nextMessage != null; - nextMessage = nextBatch.messages.poll()) { + nextMessage != null; + nextMessage = nextBatch.messages.poll()) { try { // This is a non-blocking flow controller. flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize()); @@ -386,11 +386,9 @@ private void processOutstandingBatches() { outstandingMessageBatches.addFirst(nextBatch); return; } catch (FlowControlException unexpectedException) { - throw new IllegalStateException( - "Flow control unexpected exception", unexpectedException); + throw new IllegalStateException("Flow control unexpected exception", unexpectedException); } - processOutstandingMessage( - nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler); + processOutstandingMessage(nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler); } nextBatch.doneCallback.run(); }