diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java index 826ee4fd3f4b..6eea64c0454f 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java @@ -502,7 +502,7 @@ public void run(Tuple params) throws Exception { @Override public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { messageCount.incrementAndGet(); - consumer.accept(AckReply.ACK); + consumer.ack(); } }; SubscriptionName subscriptionName = params.x(); diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndConsumeMessages.java similarity index 95% rename from google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java rename to google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndConsumeMessages.java index 2fece107b678..0ed26ef30129 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndConsumeMessages.java @@ -16,7 +16,6 @@ package com.google.cloud.examples.pubsub.snippets; -import com.google.cloud.pubsub.spi.v1.AckReply; import com.google.cloud.pubsub.spi.v1.AckReplyConsumer; import com.google.cloud.pubsub.spi.v1.MessageReceiver; import com.google.cloud.pubsub.spi.v1.Subscriber; @@ -31,7 +30,7 @@ * A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub pull subscription and * asynchronously pull messages from it. */ -public class CreateSubscriptionAndPullMessages { +public class CreateSubscriptionAndConsumeMessages { public static void main(String... args) throws Exception { // [START async_pull_subscription] @@ -47,7 +46,7 @@ public static void main(String... args) throws Exception { @Override public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { System.out.println("got message: " + message.getData().toStringUtf8()); - consumer.accept(AckReply.ACK); + consumer.ack(); } }; Subscriber subscriber = null; diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java index d315c1e0709f..61f50ffd4fd4 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/MessageReceiverSnippets.java @@ -22,7 +22,6 @@ package com.google.cloud.examples.pubsub.snippets; -import com.google.cloud.pubsub.spi.v1.AckReply; import com.google.cloud.pubsub.spi.v1.AckReplyConsumer; import com.google.cloud.pubsub.spi.v1.MessageReceiver; import com.google.pubsub.v1.PubsubMessage; @@ -51,9 +50,9 @@ public MessageReceiver messageReceiver() { MessageReceiver receiver = new MessageReceiver() { public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { if (blockingQueue.offer(message)) { - consumer.accept(AckReply.ACK); + consumer.ack(); } else { - consumer.accept(AckReply.NACK); + consumer.nack(); } } }; diff --git a/google-cloud-pubsub/README.md b/google-cloud-pubsub/README.md index 2dac9d0d8ef8..57de1f228005 100644 --- a/google-cloud-pubsub/README.md +++ b/google-cloud-pubsub/README.md @@ -158,7 +158,6 @@ With Pub/Sub you can pull messages from a subscription. Add the following import file: ```java -import com.google.cloud.pubsub.spi.v1.AckReply; import com.google.cloud.pubsub.spi.v1.AckReplyConsumer; import com.google.cloud.pubsub.spi.v1.MessageReceiver; import com.google.cloud.pubsub.spi.v1.Subscriber; @@ -175,7 +174,7 @@ MessageReceiver receiver = @Override public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { System.out.println("got message: " + message.getData().toStringUtf8()); - consumer.accept(AckReply.ACK, null); + consumer.ack(); } }; Subscriber subscriber = null; @@ -204,7 +203,7 @@ try { In [CreateTopicAndPublishMessages.java](../google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java) and -[CreateSubscriptionAndPullMessages.java](../google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java) +[CreateSubscriptionAndConsumeMessages.java](../google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndConsumeMessages.java) we put together all the code shown above into two programs. The programs assume that you are running on Compute Engine, App Engine Flexible or from your own desktop. diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReply.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReply.java deleted file mode 100644 index 64d4505ae16f..000000000000 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReply.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2017 Google Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsub.spi.v1; - -/** A reply to a Pubsub message, to be sent back to the service. */ -public enum AckReply { - /** - * Acknowledges that the message has been successfully processed. The service will not send the - * message again. - */ - ACK, - /** - * Signals that the message has not been successfully processed. The service will resend the - * message. - */ - NACK -} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java index 8e7ff3837ac5..7870e0c3b525 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/AckReplyConsumer.java @@ -18,10 +18,17 @@ /** * Accepts a reply, sending it to the service. - * - *

Both the interface and its method is named after the Java 8's {@code Consumer} interface - * to make migration to Java 8 and adopting its patterns easier. */ public interface AckReplyConsumer { - void accept(AckReply ackReply); + /** + * Acknowledges that the message has been successfully processed. The service will not send the + * message again. + */ + void ack(); + + /** + * Signals that the message has not been successfully processed. The service should resend the + * message. + */ + void nack(); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index b6b7318e30c7..5d1351e737ef 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -146,6 +146,12 @@ public String toString() { } } + /** Internal representation of a reply to a Pubsub message, to be sent back to the service. */ + public enum AckReply { + ACK, + NACK + } + /** * Handles callbacks for acking/nacking messages from the {@link * com.google.cloud.pubsub.spi.v1.MessageReceiver}. @@ -285,8 +291,13 @@ public void processReceivedMessages(List r final AckReplyConsumer consumer = new AckReplyConsumer() { @Override - public void accept(AckReply reply) { - response.set(reply); + public void ack() { + response.set(AckReply.ACK); + } + + @Override + public void nack() { + response.set(AckReply.NACK); } }; Futures.addCallback(response, ackHandler); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java index 181bdf943612..7f7e3381a53d 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageReceiver.java @@ -22,7 +22,8 @@ public interface MessageReceiver { /** * Called when a message is received by the subscriber. The implementation must arrange for {@link - * AckReplyConsumer#accept} to be called after processing the {@code message}. + * AckReplyConsumer#ack()} or {@link + * AckReplyConsumer#nack()} to be called after processing the {@code message}. * *

This {@code MessageReceiver} passes all messages to a {@code BlockingQueue}. * This method can be called concurrently from multiple threads, @@ -34,9 +35,9 @@ public interface MessageReceiver { * MessageReceiver receiver = new MessageReceiver() { * public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { * if (blockingQueue.offer(message)) { - * consumer.accept(AckReply.ACK, null); + * consumer.ack(); * } else { - * consumer.accept(AckReply.NACK, null); + * consumer.nack(); * } * } * }; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index f46c5eb349c1..be5ad944d4f2 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -55,9 +55,9 @@ * *

A {@link Subscriber} allows you to provide an implementation of a {@link MessageReceiver * receiver} to which messages are going to be delivered as soon as they are received by the - * subscriber. The delivered messages then can be {@link AckReply#ACK acked} or {@link AckReply#NACK - * nacked} at will as they get processed by the receiver. Nacking a messages implies a later - * redelivery of such message. + * subscriber. The delivered messages then can be {@link AckReplyConsumer#ack() acked} or {@link + * AckReplyConsumer#nack() nacked} at will as they get processed by the receiver. Nacking a messages + * implies a later redelivery of such message. * *

The subscriber handles the ack management, by automatically extending the ack deadline while * the message is being processed, to then issue the ack or nack of such message when the processing diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java index eba7a6eedc48..0e051254b9ba 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java @@ -21,7 +21,6 @@ import com.google.api.gax.core.SettableApiFuture; import com.google.cloud.ServiceOptions; -import com.google.cloud.pubsub.spi.v1.AckReply; import com.google.cloud.pubsub.spi.v1.AckReplyConsumer; import com.google.cloud.pubsub.spi.v1.MessageReceiver; import com.google.cloud.pubsub.spi.v1.Publisher; @@ -116,9 +115,9 @@ public void testPublishSubscribe() throws Exception { public void receiveMessage( final PubsubMessage message, final AckReplyConsumer consumer) { if (received.set(message)) { - consumer.accept(AckReply.ACK); + consumer.ack(); } else { - consumer.accept(AckReply.NACK); + consumer.nack(); } } }) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java index 19df0b265fb7..6c95bd9ece7c 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java @@ -82,13 +82,17 @@ public static Collection data() { static class TestReceiver implements MessageReceiver { private final LinkedBlockingQueue outstandingMessageReplies = new LinkedBlockingQueue<>(); - private AckReply ackReply = AckReply.ACK; + private boolean shouldAck = true; // If false, the receiver will nack the messages private Optional messageCountLatch = Optional.absent(); private Optional error = Optional.absent(); private boolean explicitAckReplies; - void setReply(AckReply ackReply) { - this.ackReply = ackReply; + void setAckReply() { + this.shouldAck = true; + } + + void setNackReply() { + this.shouldAck = false; } void setErrorReply(RuntimeException error) { @@ -149,7 +153,11 @@ private void replyTo(AckReplyConsumer reply) { if (error.isPresent()) { throw error.get(); } else { - reply.accept(ackReply); + if (shouldAck) { + reply.ack(); + } else { + reply.nack(); + } } } } @@ -196,7 +204,7 @@ public void testAckSingleMessage() throws Exception { public void testNackSingleMessage() throws Exception { Subscriber subscriber = startSubscriber(getTestSubscriberBuilder(testReceiver)); - testReceiver.setReply(AckReply.NACK); + testReceiver.setNackReply(); sendMessages(ImmutableList.of("A")); // Trigger ack sending @@ -257,7 +265,7 @@ public void testBatchAcksAndNacks() throws Exception { // Send messages to be nacked List testAckIdsBatch2 = ImmutableList.of("D", "E"); // Nack messages - testReceiver.setReply(AckReply.NACK); + testReceiver.setNackReply(); sendMessages(testAckIdsBatch2); // Trigger ack sending