8000 Max ack extension by davidtorres · Pull Request #1898 · googleapis/google-cloud-java · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c4c6174
Enabling client-side compression in the library, with an option to
davidtorres Feb 22, 2017
27338a4
Merge remote-tracking branch 'upstream/master'
davidtorres Feb 23, 2017
d6a1043
Revert "Enabling client-side compression in the library, with an opti…
davidtorres Feb 23, 2017
823ad09
Merge remote-tracking branch 'upstream/master'
davidtorres Mar 15, 2017
ec14abe
Changing the AckReplyConsumer interface to comply to just the Java 8 …
davidtorres Mar 15, 2017
652894a
Merge remote-tracking branch 'upstream/master'
davidtorres Mar 20, 2017
3e906e6
Example fixes to comply with changes to the AckReplyConsumer interface.
davidtorres Mar 20, 2017
32af932
Allowing for setting a maximum for message lease extensions.
davidtorres Mar 29, 2017
7779e37
Setting so the max ack deadline duration is always respected even when
davidtorres Mar 30, 2017
45248f0
Merge branch 'master' into max-ack-ext
davidtorres Apr 5, 2017
d4f0048
Merge remote-tracking branch 'upstream/master' 8000 ; into max-ack-ext
davidtorres Apr 5, 2017
140b6bc
Merge branch 'master' into max-ack-ext
davidtorres Apr 10, 2017
4debcbb
Align with the changes to ApiClock
davidtorres Apr 10, 2017
4883e6f
Adding a test for testing the default max ack extension duration is
davidtorres Apr 11, 2017
347c661
Addressing feedback:
davidtorres Apr 11, 2017
09172a6
Fixing race conditions in tests that for thread scheduling reason
davidtorres Apr 11, 2017
6a21a54
Addressing codacy warnings
davidtorres Apr 11, 2017
204438c
Adding documentation to the new added method in the
davidtorres Apr 12, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8000
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class MessageDispatcher {
private final ApiClock clock;

private final Duration ackExpirationPadding;
private final Duration maxAckExtensionPeriod;
private final MessageReceiver receiver;
private final AckProcessor ackProcessor;

Expand All @@ -87,20 +88,27 @@ class MessageDispatcher {
// it is not modified while inside the queue.
// The hashcode and equals methods are explicitly not implemented to discourage
// the use of this class as keys in maps or similar containers.
private static class ExtensionJob implements Comparable<ExtensionJob> {
private class ExtensionJob implements Comparable<ExtensionJob> {
Instant creation;
Instant expiration;
int nextExtensionSeconds;
ArrayList<AckHandler> ackHandlers;

ExtensionJob(
Instant expiration, int initialAckDeadlineExtension, ArrayList<AckHandler> ackHandlers) {
Instant creation,
Instant expiration,
int initialAckDeadlineExtension,
ArrayList<AckHandler> ackHandlers) {
this.creation = creation;
this.expiration = expiration;
nextExtensionSeconds = initialAckDeadlineExtension;
this.ackHandlers = ackHandlers;
}

void extendExpiration(Instant now) {
expiration = now.plus(Duration.standardSeconds(nextExtensionSeconds));
Instant possibleExtension = now.plus(Duration.standardSeconds(nextExtensionSeconds));
Instant maxExtension = creation.plus(maxAckExtensionPeriod);
expiration = possibleExtension.isBefore(maxExtension) ? possibleExtension : maxExtension;
nextExtensionSeconds = Math.min(2 * nextExtensionSeconds, MAX_ACK_DEADLINE_EXTENSION_SECS);
}

Expand Down Expand Up @@ -217,12 +225,14 @@ void sendAckOperations(
MessageReceiver receiver,
AckProcessor ackProcessor,
Duration ackExpirationPadding,
Duration maxAckExtensionPeriod,
Distribution ackLatencyDistribution,
FlowController flowController,
ScheduledExecutorService executor,
ApiClock clock) {
this.executor = executor;
this.ackExpirationPadding = ackExpirationPadding;
this.maxAckExtensionPeriod = maxAckExtensionPeriod;
this.receiver = receiver;
this.ackProcessor = ackProcessor;
this.flowController = flowController;
Expand Down Expand Up @@ -305,7 +315,11 @@ public void run() {

synchronized (outstandingAckHandlers) {
outstandingAckHandlers.add(
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));
new ExtensionJob(
new Instant(clock.millisTime()),
expiration,
INITIAL_ACK_DEADLINE_EXTENSION_SECONDS,
ackHandlers));
}
setupNextAckDeadlineExtensionAlarm(expiration);

Expand Down Expand Up @@ -380,6 +394,13 @@ public void run() {
&& outstandingAckHandlers.peek().expiration.compareTo(cutOverTime) <= 0) {
ExtensionJob job = outstandingAckHandlers.poll();

if (maxAckExtensionPeriod.getMillis() > 0
&& job.creation.plus(maxAckExtensionPeriod).compareTo(now) <= 0) {
// The job has expired, according to the maxAckExtensionPeriod, we are just going to
// drop it.
continue;
}

// If a message has already been acked, remove it, nothing to do.
for (int i = 0; i < job.ackHandlers.size(); ) {
if (job.ackHandlers.get(i).acked.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public PollingSubscriberConnection(
Credentials credentials,
MessageReceiver receiver,
Duration ackExpirationPadding,
Duration maxAckExtensionPeriod,
Distribution ackLatencyDistribution,
Channel channel,
FlowController flowController,
Expand All @@ -82,6 +83,7 @@ public PollingSubscriberConnection(
receiver,
this,
ackExpirationPadding,
maxAckExtensionPeriod,
ackLatencyDistribution,
flowController,
executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public StreamingSubscriberConnection(
Credentials credentials,
MessageReceiver receiver,
Duration ackExpirationPadding,
Duration maxAckExtensionPeriod,
int streamAckDeadlineSeconds,
Distribution ackLatencyDistribution,
Channel channel,
Expand All @@ -85,6 +86,7 @@ public StreamingSubscriberConnection(
receiver,
this,
ackExpirationPadding,
maxAckExtensionPeriod,
ackLatencyDistribution,
flowController,
executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public class Subscriber extends AbstractApiService {
private final String cachedSubscriptionNameString;
private final FlowControlSettings flowControlSettings;
private final Duration ackExpirationPadding;
private final Duration maxAckExtensionPeriod;
private final ScheduledExecutorService executor;
private final Distribution ackLatencyDistribution =
new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
Expand All @@ -113,6 +114,7 @@ private Subscriber(Builder builder) throws IOException {
subscriptionName = builder.subscriptionName;
cachedSubscriptionNameString = subscriptionName.toString();
ackExpirationPadding = builder.ackExpirationPadding;
maxAckExtensionPeriod = builder.maxAckExtensionPeriod;
streamAckDeadlineSeconds =
Math.max(
INITIAL_ACK_DEADLINE_SECONDS,
Expand Down Expand Up @@ -245,6 +247,7 @@ private void startStreamingConnections() {
credentials,
receiver,
ackExpirationPadding,
maxAckExtensionPeriod,
streamAckDeadlineSeconds,
ackLatencyDistribution,
channelBuilder.build(),
Expand Down Expand Up @@ -321,6 +324,7 @@ private void startPollingConnections() {
credentials,
receiver,
ackExpirationPadding,
maxAckExtensionPeriod,
ackLatencyDistribution,
channelBuilder.build(),
flowController,
Expand Down Expand Up @@ -409,6 +413,7 @@ public void run() {
public static final class Builder {
private static final Duration MIN_ACK_EXPIRATION_PADDING = Duration.millis(100);
private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.millis(500);
private static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.standardMinutes(60);

static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
InstantiatingExecutorProvider.newBuilder()
Expand All @@ -423,6 +428,7 @@ public static final class Builder {
MessageReceiver receiver;

Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING;
Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;

FlowControlSettings flowControlSettings = FlowControlSettings.getDefaultInstance();

Expand Down Expand Up @@ -483,6 +489,21 @@ public Builder setAckExpirationPadding(Duration ackExpirationPadding) {
return this;
}

/**
* Set the maximum period a message ack deadline will be extended.
*
* <p>It is recommended to set this value to a reasonable upper bound of the subscriber time to
* process any message. This maximum period avoids messages to be <i>locked</i> by a subscriber
* in cases when the {@link AckReply} is lost.
*
* <p>A zero duration effectively disables auto deadline extensions.
*/
public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
Preconditions.checkArgument(maxAckExtensionPeriod.getMillis() >= 0);
this.maxAckExtensionPeriod = maxAckExtensionPeriod;
return this;
}

/** Gives the ability to set a custom executor. */
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = Preconditions.checkNotNull(executorProvider);
Expand All @@ -500,3 +521,4 @@ public Subscriber build() throws IOException {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.AbstractExecutorService;
Expand All @@ -44,6 +46,7 @@ public class FakeScheduledExecutorService extends AbstractExecutorService
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final PriorityQueue<PendingCallable<?>> pendingCallables = new PriorityQueue<>();
private final FakeClock clock = new FakeClock();
private final Deque<Duration> expectedWorkQueue = new LinkedList<>();

public ApiClock getClock() {
return clock;
Expand Down Expand Up @@ -79,6 +82,35 @@ public ScheduledFuture<?> scheduleWithFixedDelay(
Duration.millis(unit.toMillis(initialDelay)), command, PendingCallableType.FIXED_DELAY));
}

/**
* This allows for adding expectations on future work to be scheduled (
* {@link FakeScheduledExecutorService#schedule}
* or {@link FakeScheduledExecutorService#scheduleAtFixedRate}
* or {@link FakeScheduledExecutorService#scheduleWithFixedDelay}) based on its delay.
*/
public void setupScheduleExpectation(Duration delay) {
synchronized (expectedWorkQueue) {
expectedWorkQueue.add(delay);
}
}

/**
* Blocks the current thread until all the work
* {@link FakeScheduledExecutorService#setupScheduleExpectation(Duration) expected} has been
* scheduled in the executor.
*/
public void waitForExpectedWork() {
synchronized (expectedWorkQueue) {
while (!expectedWorkQueue.isEmpty()) {
try {
expectedWorkQueue.wait();
} catch (InterruptedException e) {
// Wait uninterruptibly
}
}
}
}

/**
* This will advance the reference time of the executor and execute (in the same thread) any
* outstanding callable which execution time has passed.
Expand All @@ -94,13 +126,14 @@ private void work() {
for (;;) {
PendingCallable<?> callable = null;
synchronized (pendingCallables) {
if (pendingCallables.isEmpty() || pendingCallables.peek().getScheduledTime().isAfter(cmpTime)) {
if (pendingCallables.isEmpty()
|| pendingCallables.peek().getScheduledTime().isAfter(cmpTime)) {
break;
}
callable = pendingCallables.poll();
}
if (callable != null) {
try{
try {
callable.call();
} catch (Exception e) {
// We ignore any callable exception, which should be set to the future but not relevant to
Expand Down Expand Up @@ -182,6 +215,16 @@ <V> ScheduledFuture<V> schedulePendingCallable(PendingCallable<V> callable) {
pendingCallables.add(callable);
}
work();
synchronized (expectedWorkQueue) {

This comment was marked as spam.

This comment was marked as spam.

// We compare by the callable delay in order decide when to remove expectations from the
// expected work queue, i.e. only the expected work that matches the delay of the scheduled
// callable is removed from the queue.
if (!expectedWorkQueue.isEmpty() && expectedWorkQueue.peek().equals(callable.delay)) {
expectedWorkQueue.poll();
}
expectedWorkQueue.notifyAll();
}

return callable.getScheduledFuture();
}

Expand Down
Loading
0