8000 Deadlock issues by davidtorres · Pull Request #1884 · googleapis/google-cloud-java · GitHub
[go: up one dir, main page]

Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c4c6174
Enabling client-side compression in the library, with an option to
davidtorres Feb 22, 2017
27338a4
Merge remote-tracking branch 'upstream/master'
davidtorres Feb 23, 2017
d6a1043
Revert "Enabling client-side compression in the library, with an opti…
davidtorres Feb 23, 2017
823ad09
Merge remote-tracking branch 'upstream/master'
davidtorres Mar 15, 2017
ec14abe
Changing the AckReplyConsumer interface to comply to just the Java 8 …
davidtorres Mar 15, 2017
652894a
Merge remote-tracking branch 'upstream/master'
davidtorres Mar 20, 2017
3e906e6
Example fixes to comply with changes to the AckReplyConsumer interface.
davidtorres Mar 20, 2017
32af932
Allowing for setting a maximum for message lease extensions.
davidtorres Mar 29, 2017
7779e37
Setting so the max ack deadline duration is always respected even when
davidtorres Mar 30, 2017
45248f0
Merge branch 'master' into max-ack-ext
davidtorres Apr 5, 2017
d4f0048
Merge remote-tracking branch 'upstream/master' into max-ack-ext
davidtorres Apr 5, 2017
f180f8c
Merge remote-tracking branch 'upstream/master'
davidtorres Apr 5, 2017
f5f1151
Merge branch 'max-ack-ext'
davidtorres Apr 5, 2017
dde0efb
Merge branch 'master' into master
pongad Apr 6, 2017
eb96c97
Merge branch 'master' into master
pongad Apr 6, 2017
8b91be3
Merge remote-tracking branch 'upstream/master'
davidtorres Apr 6, 2017
3d3a166
Fix a merge/compilation error with clock, clock.millis() -> clock.mil…
davidtorres Apr 6, 2017
b555c97
Decouple the processing of new received messages from the dispatching to
davidtorres Apr 7, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package com.google.cloud.pubsub.spi.v1;

import com.google.api.gax.core.FlowController;
import com.google.api.gax.core.ApiClock;
import com.google.api.gax.core.FlowController;
import com.google.api.gax.core.FlowController.FlowControlException;
import com.google.api.stats.Distribution;
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.OutstandingMessagesBatch.OutstandingMessage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
Expand All @@ -29,11 +31,13 @@
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -56,11 +60,15 @@ class MessageDispatcher {
private static final int INITIAL_ACK_DEADLINE_EXTENSION_SECONDS = 2;
@VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.millis(100);
private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m

private static final ScheduledExecutorService alarmsExecutor =
Executors.newScheduledThreadPool(2);

private final ScheduledExecutorService executor;
private final ApiClock clock;

private final Duration ackExpirationPadding;
private final Duration maxAckExtensionPeriod;
private final MessageReceiver receiver;
private final AckProcessor ackProcessor;

Expand All @@ -87,20 +95,27 @@ class MessageDispatcher {
// it is not modified while inside the queue.
// The hashcode and equals methods are explicitly not implemented to discourage
// the use of this class as keys in maps or similar containers.
private static class ExtensionJob implements Comparable<ExtensionJob> {
private class ExtensionJob implements Comparable<ExtensionJob> {
Instant creation;
Instant expiration;
int nextExtensionSeconds;
ArrayList<AckHandler> ackHandlers;

ExtensionJob(
Instant expiration, int initialAckDeadlineExtension, ArrayList<AckHandler> ackHandlers) {
Instant creation,
Instant expiration,
int initialAckDeadlineExtension,
ArrayList<AckHandler> ackHandlers) {
this.creation = creation;
this.expiration = expiration;
nextExtensionSeconds = initialAckDeadlineExtension;
this.ackHandlers = ackHandlers;
}

void extendExpiration(Instant now) {
expiration = now.plus(Duration.standardSeconds(nextExtensionSeconds));
Instant possibleExtension = now.plus(Duration.standardSeconds(nextExtensionSeconds));
Instant maxExtension = creation.plus(maxAckExtensionPeriod);
expiration = possibleExtension.isBefore(maxExtension) ? possibleExtension : maxExtension;
nextExtensionSeconds = Math.min(2 * nextExtensionSeconds, MAX_ACK_DEADLINE_EXTENSION_SECS);
}

Expand Down Expand Up @@ -176,6 +191,7 @@ public void onFailure(Throwable t) {
setupPendingAcksAlarm();
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
processOutstandingBatches();
}

@Override
Expand All @@ -186,25 +202,23 @@ public void onSuccess(AckReply reply) {
synchronized (pendingAcks) {
pendingAcks.add(ackId);
}
setupPendingAcksAlarm();
flowController.release(1, outstandingBytes);
// Record the latency rounded to the next closest integer.
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTime.getMillis()) / 1000D)));
messagesWaiter.incrementPendingMessages(-1);
return;
break;
case NACK:
synchronized (pendingNacks) {
pendingNacks.add(ackId);
}
setupPendingAcksAlarm();
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
return;
break;
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
}
setupPendingAcksAlarm();
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
processOutstandingBatches();
}
}

Expand All @@ -217,12 +231,14 @@ void sendAckOperations(
MessageReceiver receiver,
AckProcessor ackProcessor,
Duration ackExpirationPadding,
Duration maxAckExtensionPeriod,
Distribution ackLatencyDistribution,
FlowController flowController,
ScheduledExecutorService executor,
ApiClock clock) {
this.executor = executor;
this.ackExpirationPadding = ackExpirationPadding;
this.maxAckExtensionPeriod = maxAckExtensionPeriod;
this.receiver = receiver;
this.ackProcessor = ackProcessor;
this.flowController = flowController;
Expand Down Expand Up @@ -259,28 +275,117 @@ public int getMessageDeadlineSeconds() {
return messageDeadlineSeconds;
}

public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> responseMessages) {
int receivedMessagesCount = responseMessages.size();
if (receivedMessagesCount == 0) {
static class OutstandingMessagesBatch {
static class OutstandingMessage {
private final com.google.pubsub.v1.ReceivedMessage receivedMessage;
private final AckHandler ackHandler;

public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
this.receivedMessage = receivedMessage;
this.ackHandler = ackHandler;
}

public com.google.pubsub.v1.ReceivedMessage receivedMessage() {
return receivedMessage;
}

public AckHandler ackHandler() {
return ackHandler;
}
}

private final Deque<OutstandingMessage> messages;
private final Runnable doneCallback;

public OutstandingMessagesBatch(Runnable doneCallback) {
this.messages = new LinkedList<>();
this.doneCallback = doneCallback;
}

public void addMessage(
com.google.pubsub.v1.ReceivedMessage receivedMessage, AckHandler ackHandler) {
this.messages.add(new OutstandingMessage(receivedMessage, ackHandler));
}

public Deque<OutstandingMessage> messages() {
return messages;
}

public void done() {
doneCallback.run();
}
}

Deque<OutstandingMessagesBatch> outstandingMessageBatches = new LinkedList<>();

public void processReceivedMessages(
List<com.google.pubsub.v1.ReceivedMessage> messages, Runnable doneCallback) {
if (messages.size() == 0) {
doneCallback.run();
return;
}
Instant now = new Instant(clock.millisTime());
int totalByteCount = 0;
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size());
for (ReceivedMessage pubsubMessage : responseMessages) {
int messageSize = pubsubMessage.getMessage().getSerializedSize();
totalByteCount += messageSize;
ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize));
messagesWaiter.incrementPendingMessages(messages.size());

OutstandingMessagesBatch outstandingBatch = new OutstandingMessagesBatch(doneCallback);
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(messages.size());
for (ReceivedMessage message : messages) {
AckHandler ackHandler =
new AckHandler(message.getAckId(), message.getMessage().getSerializedSize());
ackHandlers.add(ackHandler);
outstandingBatch.addMessage(message, ackHandler);
}
Instant expiration = now.plus(messageDeadlineSeconds * 1000);
logger.log(
Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now});

messagesWaiter.incrementPendingMessages(responseMessages.size());
Iterator<AckHandler> acksIterator = ackHandlers.iterator();
for (ReceivedMessage userMessage : responseMessages) {
final PubsubMessage message = userMessage.getMessage();
final AckHandler ackHandler = acksIterator.next();

Instant expiration = new Instant(clock.millisTime()).plus(messageDeadlineSeconds * 1000);
synchronized (outstandingAckHandlers) {
outstandingAckHandlers.add(
new ExtensionJob(
new Instant(clock.millisTime()),
expiration,
INITIAL_ACK_DEADLINE_EXTENSION_SECONDS,
ackHandlers));
}
setupNextAckDeadlineExtensionAlarm(expiration);

synchronized (outstandingMessageBatches) {
outstandingMessageBatches.add(outstandingBatch);
}
processOutstandingBatches();
}

public void processOutstandingBatches() {
while (true) {
boolean batchDone = false;
Runnable batchCallback = null;
OutstandingMessage outstandingMessage;
synchronized (outstandingMessageBatches) {
OutstandingMessagesBatch nextBatch = outstandingMessageBatches.peek();
if (nextBatch == null) {
return;
}
outstandingMessage = nextBatch.messages.peek();
if (outstandingMessage == null) {
return;
}
try {
// This is a non-blocking flow controller.
flowController.reserve(
1, outstandingMessage.receivedMessage().getMessage().getSerializedSize());
} catch (FlowController.MaxOutstandingElementCountReachedException
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
return;
} catch (FlowControlException unexpectedException) {
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
nextBatch.messages.poll(); // We got a hold to the message already.
batchDone = nextBatch.messages.isEmpty();
if (batchDone) {
outstandingMessageBatches.poll();
batchCallback = nextBatch.doneCallback;
}
}

final PubsubMessage message = outstandingMessage.receivedMessage().getMessage();
final AckHandler ackHandler = outstandingMessage.ackHandler();
final SettableFuture<AckReply> response = SettableFuture.create();
final AckReplyConsumer consumer =
new AckReplyConsumer() {
Expand All @@ -301,18 +406,9 @@ public void run() {
}
}
});
}

synchronized (outstandingAckHandlers) {
outstandingAckHandlers.add(
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));
}
setupNextAckDeadlineExtensionAlarm(expiration);

try {
flowController.reserve(receivedMessagesCount, totalByteCount);
} catch (FlowController.FlowControlException unexpectedException) {
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
if (batchDone) {
batchCallback.run();
}
}
}

Expand All @@ -321,7 +417,7 @@ private void setupPendingAcksAlarm() {
try {
if (pendingAcksAlarm == null) {
pendingAcksAlarm =
executor.schedule(
alarmsExecutor.schedule(
new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -380,6 +476,13 @@ public void run() {
&& outstandingAckHandlers.peek().expiration.compareTo(cutOverTime) <= 0) {
ExtensionJob job = outstandingAckHandlers.poll();

if (maxAckExtensionPeriod.getMillis() > 0
&& job.creation.plus(maxAckExtensionPeriod).compareTo(now) <= 0) {
// The job has expired, according to the maxAckExtensionPeriod, we are just going to
// drop it.
continue;
}

// If a message has already been acked, remove it, nothing to do.
for (int i = 0; i < job.ackHandlers.size(); ) {
if (job.ackHandlers.get(i).acked.get()) {
Expand Down Expand Up @@ -443,7 +546,7 @@ private void setupNextAckDeadlineExtensionAlarm(Instant expiration) {
nextAckDeadlineExtensionAlarmTime = possibleNextAlarmTime;

ackDeadlineExtensionAlarm =
executor.schedule(
alarmsExecutor.schedule(
new AckDeadlineAlarm(),
nextAckDeadlineExtensionAlarmTime.getMillis() - clock.millisTime(),
TimeUnit.MILLISECONDS);
Expand Down
Loading
0