10BC0 deadlock in testStreamAckDeadlineUpdate · Issue #1577 · googleapis/google-cloud-java · GitHub
[go: up one dir, main page]

Skip to content

deadlock in testStreamAckDeadlineUpdate #1577

@pongad

Description

@pongad

@garrettjonesgoogle @davidtorres @kir-titievsky

I tracked down another deadlock, but this one I cannot fix without changing the surface.

In the happy case, the test

  • receives a message, and creates a future to respond
  • advance time by 20s
  • sets the future result to ACK
    • this updates the latency distribution
    • the next time we configure the stream latency, we'll set the latency to 20s
  • advance time by 60s
    • we configure stream latency every minute
    • so, this tells the server to change our stream latency to 20s
  • YAY!

The mechanism we use the update latency is that we add a callback to the ListenableFuture that is returned by MessageReceiver::receiveMessage. The logic is that once the user finishes with the work, the latency distribution is automatically updated.

In the bad case:

  • the GRPC thread calls MessageReciever::receive
    • the test implementation creates a future, puts it in a queue to be retrieved by the test code.
  • advance time by 20s
  • the test thread takes the future above out the queue, and sets future result
  • advance time by 60s
  • then, the GRPC thread finally adds the callback (let's call this step S1)
    • since the future is already completed, we update the latency distribution to 80s
  • the minute for configuring stream latency has also passed (let's call this step S2)
    • change stream latency to 80s

In an even worse case, S1 and S2 can switch order. Since there is no entries in the distribution, we don't update the server with anything. The latency on the server stays at 10s (the default).

Test code expects the latency value on the server eventually be set to 20s. In either bad case, the test deadlocks.

I don't think this test can be fixed with the current surface. Since the Future is returned by MessageReceiver, it stands to reason that other threads can do arbitrary things to the future before the callback can be added. Note that this problem affects production code as well: the synchronization is still technically incorrect though the consequence is small enough that it probably will never matter.

I can think of a couple of ways to fix it:
Current

// Definition
ListenableFuture<AckReply> receiveMessage(PubsubMessage message);

// Sample implementation
ListenableFuture<AckReply> receiveMessage(final PubsubMessage message) {
  return executor.submit(new Callable<AckReply>() {
    @Override
    AckReply call() {
      if (someLongComputation(message)) {
        return AckReply.ACK;
      }
      return AckReply.NACK;
    }
  });
}

Option 1

// Definition
void receiveMessage(PubsubMessage message, SettableFuture<AckReply> response);

// Sample implementation
void receiveMessage(final PubsubMessage message, final SettableFuture<AckReply> response) {
  ListenableFuture<AckReply> future = executor.submit(new Callable<AckReply>() {
    @Override
    AckReply call() {
      if (someLongComputation(message)) {
        return AckReply.ACK;
      }
      return AckReply.NACK;
    }
  });
  response.setFuture(future);
}

In this way, we create the future for the user and can make sure the callback is set before the user can do anything with it. If the user takes a long time to respond to a job, job scheduling is still left to the user. Eg, if the user wants to process many messages in a threadpool, the user must create the threadpool themselves.

Option 2

// Definition
AckReply receiveMessage(PubsubMessage message);
// This is essentially the same with Function<PubsubMessage, AckReply>

// Sample implementation
AckReply receiveMessage(PubsubMessage message) {
  return someLongComputation(message);
}

We can call the function from a threadpool ourselves. The function must be thread safe. We can increase the number of threads in the pool to cope with extra load. The user can always set the number of threads to suit their work load.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions

    0