8000 Change AckReplyConsumer to expose individual methods for replying by davidtorres · Pull Request #1899 · googleapis/google-cloud-java · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ public void run(Tuple<SubscriptionName, Long> params) throws Exception {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
messageCount.incrementAndGet();
consumer.accept(AckReply.ACK);
consumer.ack();
}
};
SubscriptionName subscriptionName = params.x();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.google.cloud.examples.pubsub.snippets;

import com.google.cloud.pubsub.spi.v1.AckReply;
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
import com.google.cloud.pubsub.spi.v1.Subscriber;
Expand All @@ -31,7 +30,7 @@
* A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub pull subscription and
* asynchronously pull messages from it.
*/
public class CreateSubscriptionAndPullMessages {
public class CreateSubscriptionAndConsumeMessages {

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


public static void main(String... args) throws Exception {
// [START async_pull_subscription]
Expand All @@ -47,7 +46,7 @@ public static void main(String... args) throws Exception {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
System.out.println("got message: " + message.getData().toStringUtf8());
consumer.accept(AckReply.ACK);
consumer.ack();
}
};
Subscriber subscriber = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

package com.google.cloud.examples.pubsub.snippets;

import com.google.cloud.pubsub.spi.v1.AckReply;
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
import com.google.pubsub.v1.PubsubMessage;
Expand Down Expand Up @@ -51,9 +50,9 @@ public MessageReceiver messageReceiver() {
MessageReceiver receiver = new MessageReceiver() {
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
if (blockingQueue.offer(message)) {
consumer.accept(AckReply.ACK);
consumer.ack();
} else {
consumer.accept(AckReply.NACK);
consumer.nack();
}
}
};
Expand Down
5 changes: 2 additions & 3 deletions google-cloud-pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ With Pub/Sub you can pull messages from a subscription. Add the following import
file:
8000
```java
import com.google.cloud.pubsub.spi.v1.AckReply;
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
import com.google.cloud.pubsub.spi.v1.Subscriber;
Expand All @@ -175,7 +174,7 @@ MessageReceiver receiver =
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
System.out.println("got message: " + message.getData().toStringUtf8());
consumer.accept(AckReply.ACK, null);
consumer.ack();
}
};
Subscriber subscriber = null;
Expand Down Expand Up @@ -204,7 +203,7 @@ try {
In
[CreateTopicAndPublishMessages.java](../google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java)
and
[CreateSubscriptionAndPullMessages.java](../google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java)
[CreateSubscriptionAndConsumeMessages.java](../google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndConsumeMessages.java)
we put together all the code shown above into two programs. The programs assume that you are
running on Compute Engine, App Engine Flexible or from your own desktop.

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@

/**
* Accepts a reply, sending it to the service.
*
* <p>Both the interface and its method is named after the Java 8's {@code Consumer} interface
* to make migration to Java 8 and adopting its patterns easier.
*/
public interface AckReplyConsumer {
void accept(AckReply ackReply);
/**
* Acknowledges that the message has been successfully processed. The service will not send the
* message again.
*/
void ack();

/**
* Signals that the message has not been successfully processed. The service should resend the
* message.
*/
void nack(); 9E88
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ public String toString() {
}
}

/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
public enum AckReply {
ACK,
NACK
}

/**
* Handles callbacks for acking/nacking messages from the {@link
* com.google.cloud.pubsub.spi.v1.MessageReceiver}.
Expand Down Expand Up @@ -285,8 +291,13 @@ public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> r
final AckReplyConsumer consumer =
new AckReplyConsumer() {
@Override
public void accept(AckReply reply) {
response.set(reply);
public void ack() {
response.set(AckReply.ACK);
}

@Override
public void nack() {
response.set(AckReply.NACK);
}
};
Futures.addCallback(response, ackHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
public interface MessageReceiver {
/**
* Called when a message is received by the subscriber. The implementation must arrange for {@link
* AckReplyConsumer#accept} to be called after processing the {@code message}.
* AckReplyConsumer#ack()} or {@link
* AckReplyConsumer#nack()} to be called after processing the {@code message}.
*
* <p>This {@code MessageReceiver} passes all messages to a {@code BlockingQueue}.
* This method can be called concurrently from multiple threads,
Expand All @@ -34,9 +35,9 @@ public interface MessageReceiver {
* MessageReceiver receiver = new MessageReceiver() {
* public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
* if (blockingQueue.offer(message)) {
* consumer.accept(AckReply.ACK, null);
* consumer.ack();
* } else {
* consumer.accept(AckReply.NACK, null);
* consumer.nack();
* }
* }
* };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
*
* <p>A {@link Subscriber} allows you to provide an implementation of a {@link MessageReceiver
* receiver} to which messages are going to be delivered as soon as they are received by the
* subscriber. The delivered messages then can be {@link AckReply#ACK acked} or {@link AckReply#NACK
* nacked} at will as they get processed by the receiver. Nacking a messages implies a later
* redelivery of such message.
* subscriber. The delivered messages then can be {@link AckReplyConsumer#ack() acked} or {@link
* AckReplyConsumer#nack() nacked} at will as they get processed by the receiver. Nacking a messages
* implies a later redelivery of such message.
*
* <p>The subscriber handles the ack management, by automatically extending the ack deadline while
* the message is being processed, to then issue the ack or nack of such message when the processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.api.gax.core.SettableApiFuture;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.spi.v1.AckReply;
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
import com.google.cloud.pubsub.spi.v1.Publisher;
Expand Down Expand Up @@ -116,9 +115,9 @@ public void testPublishSubscribe() throws Exception {
public void receiveMessage(
final PubsubMessage message, final AckReplyConsumer consumer) {
if (received.set(message)) {
consumer.accept(AckReply.ACK);
consumer.ack();
} else {
consumer.accept(AckReply.NACK);
consumer.nack();
}
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,17 @@ public static Collection<Object[]> data() {
static class TestReceiver implements MessageReceiver {
private final LinkedBlockingQueue<AckReplyConsumer> outstandingMessageReplies =
new LinkedBlockingQueue<>();
private AckReply ackReply = AckReply.ACK;
private boolean shouldAck = true; // If false, the receiver will <b>nack</b> the messages
private Optional<CountDownLatch> messageCountLatch = Optional.absent();
private Optional<RuntimeException> error = Optional.absent();
private boolean explicitAckReplies;

void setReply(AckReply ackReply) {
this.ackReply = ackReply;
void setAckReply() {
this.shouldAck = true;
}

void setNackReply() {
this.shouldAck = false;
}

void setErrorReply(RuntimeException error) {
Expand Down Expand Up @@ -149,7 +153,11 @@ private void replyTo(AckReplyConsumer reply) {
if (error.isPresent()) {
throw error.get();
} else {
reply.accept(ackReply);
if (shouldAck) {
reply.ack();
} else {
reply.nack();
}
}
}
}
Expand Down Expand Up @@ -196,7 +204,7 @@ public void testAckSingleMessage() throws Exception {
public void testNackSingleMessage() throws Exception {
Subscriber subscriber = startSubscriber(getTestSubscriberBuilder(testReceiver));

testReceiver.setReply(AckReply.NACK);
testReceiver.setNackReply();
sendMessages(ImmutableList.of("A"));

// Trigger ack sending
Expand Down Expand Up @@ -257,7 +265,7 @@ public void testBatchAcksAndNacks() throws Exception {
// Send messages to be nacked
List<String> testAckIdsBatch2 = ImmutableList.of("D", "E");
// Nack messages
testReceiver.setReply(AckReply.NACK);
testReceiver.setNackReply();
sendMessages(testAckIdsBatch2);

// Trigger ack sending
Expand Down
0