From 090326c12a96571fe0f975762679fa5cc71e88a9 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 25 Jan 2017 16:38:47 +1100 Subject: [PATCH 1/2] return Name objects instead of plain String --- .../CreateSubscriptionAndPullMessages.java | 3 +- .../CreateTopicAndPublishMessages.java | 2 +- .../google/cloud/pubsub/spi/v1/Publisher.java | 26 +++---- .../cloud/pubsub/spi/v1/Subscriber.java | 67 ++++++++----------- .../pubsub/spi/v1/PublisherImplTest.java | 12 ++-- .../pubsub/spi/v1/SubscriberImplTest.java | 3 +- 6 files changed, 50 insertions(+), 63 deletions(-) diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java index 32a8a575b579..637a7b532ee8 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java @@ -22,7 +22,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.Service; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.SubscriptionName; @@ -52,7 +51,7 @@ public ListenableFuture receiveMessage(PubsubMessage m }; Subscriber subscriber = null; try { - subscriber = Subscriber.Builder.newBuilder(subscription, receiver).build(); + subscriber = Subscriber.newBuilder(subscription, receiver).build(); subscriber.addListener( new Subscriber.SubscriberListener() { @Override diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java index b5d40d24a23e..d57c16ce11c0 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java @@ -40,7 +40,7 @@ public static void main(String... args) throws Exception { Publisher publisher = null; try { - publisher = Publisher.Builder.newBuilder(topic).build(); + publisher = Publisher.newBuilder(topic).build(); List messages = Arrays.asList("first message", "second message"); List> messageIds = new ArrayList<>(); for (String message : messages) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index 39941ae878fa..95414579336d 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -82,7 +82,7 @@ * *

  *  Publisher publisher =
- *       Publisher.Builder.newBuilder(MY_TOPIC)
+ *       Publisher.newBuilder(MY_TOPIC)
  *           .setMaxBundleDuration(new Duration(10 * 1000))
  *           .build();
  *  List<ListenableFuture<String>> results = new ArrayList<>();
@@ -121,6 +121,7 @@ public static long getApiMaxRequestBytes() {
 
   private static final Logger logger = LoggerFactory.getLogger(Publisher.class);
 
+  private final TopicName topicName;
   private final String topic;
 
   private final BundlingSettings bundlingSettings;
@@ -147,7 +148,8 @@ public static long getApiMaxRequestBytes() {
   private ScheduledFuture currentAlarmFuture;
 
   private Publisher(Builder builder) throws IOException {
-    topic = builder.topic;
+    topicName = builder.topicName;
+    topic = topicName.toString();
 
     this.bundlingSettings = builder.bundlingSettings;
     this.retrySettings = builder.retrySettings;
@@ -195,8 +197,8 @@ public void close() throws IOException {
   }
 
   /** Topic which the publisher publishes to. */
-  public String getTopic() {
-    return topic;
+  public TopicName getTopicName() {
+    return topicName;
   }
 
   /**
@@ -520,6 +522,11 @@ private boolean isRetryable(Throwable t) {
     }
   }
 
+  /** Constructs a new {@link Builder} using the given topic. */
+  public static Builder newBuilder(TopicName topicName) {
+    return new Builder(topicName);
+  }
+
   /** A builder of {@link Publisher}s. */
   public static final class Builder {
     static final Duration MIN_TOTAL_TIMEOUT = new Duration(10 * 1000); // 10 seconds
@@ -554,7 +561,7 @@ public static final class Builder {
             .setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
             .build();
 
-    String topic;
+    TopicName topicName;
 
     // Bundling options
     BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS;
@@ -572,13 +579,8 @@ public static final class Builder {
 
     ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
 
-    /** Constructs a new {@link Builder} using the given topic. */
-    public static Builder newBuilder(TopicName topic) {
-      return new Builder(topic.toString());
-    }
-
-    Builder(String topic) {
-      this.topic = Preconditions.checkNotNull(topic);
+    private Builder(TopicName topic) {
+      this.topicName = Preconditions.checkNotNull(topic);
     }
 
     /**
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java
index e4f7a4a6636e..96c2f5ab115a 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java
@@ -32,8 +32,6 @@
 import com.google.common.util.concurrent.Service;
 import com.google.pubsub.v1.SubscriptionName;
 import io.grpc.ManagedChannelBuilder;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeoutException;
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
 import io.grpc.netty.GrpcSslContexts;
@@ -43,9 +41,11 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,7 +88,7 @@
  * }
  *
  * Subscriber subscriber =
- *     Subscriber.Builder.newBuilder(MY_SUBSCRIPTION, receiver)
+ *     Subscriber.newBuilder(MY_SUBSCRIPTION, receiver)
  *         .setMaxBundleAcks(100)
  *         .build();
  *
@@ -123,19 +123,33 @@ private Subscriber(Builder builder) throws IOException {
     impl = new SubscriberImpl(builder);
   }
 
+  /**
+   * Constructs a new {@link Builder}.
+   *
+   * 

Once {@link #build()} is called a gRPC stub will be created for use of the {@link + * Subscriber}. + * + * @param subscription Cloud Pub/Sub subscription to bind the subscriber to + * @param receiver an implementation of {@link MessageReceiver} used to process the received + * messages + */ + public static Builder newBuilder(SubscriptionName subscription, MessageReceiver receiver) { + return new Builder(subscription, receiver); + } + /** Subscription which the subscriber is subscribed to. */ - public String getSubscription() { - return impl.getSubscription(); + public SubscriptionName getSubscriptionName() { + return impl.subscriptionName; } /** Acknowledgement expiration padding. See {@link Builder.setAckExpirationPadding}. */ public Duration getAckExpirationPadding() { - return impl.getAckExpirationPadding(); + return impl.ackExpirationPadding; } /** The flow control settings the Subscriber is configured with. */ public FlowController.Settings getFlowControlSettings() { - return impl.getFlowControlSettings(); + return impl.flowControlSettings; } public void addListener(final SubscriberListener listener, Executor executor) { @@ -249,6 +263,7 @@ public void terminated(State from) {} private static class SubscriberImpl extends AbstractService { private static final Logger logger = LoggerFactory.getLogger(Subscriber.class); + private final SubscriptionName subscriptionName; private final String subscription; private final FlowController.Settings flowControlSettings; private final Duration ackExpirationPadding; @@ -270,7 +285,8 @@ private static class SubscriberImpl extends AbstractService { private SubscriberImpl(Builder builder) throws IOException { receiver = builder.receiver; flowControlSettings = builder.flowControlSettings; - subscription = builder.subscription; + subscriptionName = builder.subscriptionName; + subscription = subscriptionName.toString(); ackExpirationPadding = builder.ackExpirationPadding; streamAckDeadlineSeconds = Math.max( @@ -496,21 +512,6 @@ public void run() { throw new IllegalStateException(e); } } - - /** Subscription which the subscriber is subscribed to. */ - public String getSubscription() { - return subscription; - } - - /** Acknowledgement expiration padding. See {@link Builder.setAckExpirationPadding}. */ - public Duration getAckExpirationPadding() { - return ackExpirationPadding; - } - - /** The flow control settings the Subscriber is configured with. */ - public FlowController.Settings getFlowControlSettings() { - return flowControlSettings; - } } /** Builder of {@link Subscriber Subscribers}. */ @@ -526,7 +527,7 @@ public static final class Builder { * Runtime.getRuntime().availableProcessors()) .build(); - String subscription; + SubscriptionName subscriptionName; Optional credentials = Optional.absent(); MessageReceiver receiver; @@ -539,22 +540,8 @@ public static final class Builder { Optional.absent(); Optional clock = Optional.absent(); - /** - * Constructs a new {@link Builder}. - * - *

Once {@link #build()} is called a gRPC stub will be created for use of the {@link - * Subscriber}. - * - * @param subscription Cloud Pub/Sub subscription to bind the subscriber to - * @param receiver an implementation of {@link MessageReceiver} used to process the received - * messages - */ - public static Builder newBuilder(SubscriptionName subscription, MessageReceiver receiver) { - return new Builder(subscription.toString(), receiver); - } - - Builder(String subscription, MessageReceiver receiver) { - this.subscription = subscription; + Builder(SubscriptionName subscriptionName, MessageReceiver receiver) { + this.subscriptionName = subscriptionName; this.receiver = receiver; } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java index e831763702af..c63c023072b6 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java @@ -353,7 +353,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce public void testPublisherGetters() throws Exception { FakeCredentials credentials = new FakeCredentials(); - Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC); + Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC); builder.setChannelBuilder(testChannelBuilder); builder.setCredentials(credentials); builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR); @@ -371,7 +371,7 @@ public void testPublisherGetters() throws Exception { .build()); Publisher publisher = builder.build(); - assertEquals(TEST_TOPIC.toString(), publisher.getTopic()); + assertEquals(TEST_TOPIC, publisher.getTopicName()); assertEquals(10, (long) publisher.getBundlingSettings().getRequestByteThreshold()); assertEquals(new Duration(11), publisher.getBundlingSettings().getDelayThreshold()); assertEquals(12, (long) publisher.getBundlingSettings().getElementCountThreshold()); @@ -385,8 +385,8 @@ public void testPublisherGetters() throws Exception { @Test public void testBuilderParametersAndDefaults() { - Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC); - assertEquals(TEST_TOPIC.toString(), builder.topic); + Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC); + assertEquals(TEST_TOPIC, builder.topicName); assertEquals(Optional.absent(), builder.channelBuilder); assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider); assertFalse(builder.failOnFlowControlLimits); @@ -405,7 +405,7 @@ public void testBuilderParametersAndDefaults() { @Test public void testBuilderInvalidArguments() { - Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC); + Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC); try { builder.setChannelBuilder(null); @@ -602,7 +602,7 @@ public void testBuilderInvalidArguments() { } private Builder getTestPublisherBuilder() { - return Publisher.Builder.newBuilder(TEST_TOPIC) + return Publisher.newBuilder(TEST_TOPIC) .setCredentials(testCredentials) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) .setChannelBuilder(testChannelBuilder); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java index aead67ee7ede..853d9c90bb44 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java @@ -29,7 +29,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.Service.State; import com.google.common.util.concurrent.SettableFuture; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PullResponse; @@ -467,7 +466,7 @@ private void sendMessages(Iterable ackIds) throws InterruptedException { } private Builder getTestSubscriberBuilder(MessageReceiver receiver) { - return Subscriber.Builder.newBuilder(TEST_SUBSCRIPTION, receiver) + return Subscriber.newBuilder(TEST_SUBSCRIPTION, receiver) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) .setCredentials(testCredentials) .setChannelBuilder(testChannelBuilder) From a288c44077e69ead750f2facb1378e6e86696354 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 26 Jan 2017 09:44:22 +1100 Subject: [PATCH 2/2] pr comments --- .../java/com/google/cloud/pubsub/spi/v1/Publisher.java | 8 ++++---- .../java/com/google/cloud/pubsub/spi/v1/Subscriber.java | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index 9166d99d7316..20e2e291be86 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -123,7 +123,7 @@ public static long getApiMaxRequestBytes() { private static final Logger logger = LoggerFactory.getLogger(Publisher.class); private final TopicName topicName; - private final String topic; + private final String cachedTopicNameString; private final BundlingSettings bundlingSettings; private final RetrySettings retrySettings; @@ -151,7 +151,7 @@ public static long getApiMaxRequestBytes() { private Publisher(Builder builder) throws IOException { topicName = builder.topicName; - topic = topicName.toString(); + cachedTopicNameString = topicName.toString(); this.bundlingSettings = builder.bundlingSettings; this.retrySettings = builder.retrySettings; @@ -335,7 +335,7 @@ private void publishAllOutstanding() { private void publishOutstandingBundle(final OutstandingBundle outstandingBundle) { PublishRequest.Builder publishRequest = PublishRequest.newBuilder(); - publishRequest.setTopic(topic); + publishRequest.setTopic(cachedTopicNameString); for (OutstandingPublish outstandingPublish : outstandingBundle.outstandingPublishes) { publishRequest.addMessages(outstandingPublish.message); } @@ -529,7 +529,7 @@ private boolean isRetryable(Throwable t) { interface LongRandom { long nextLong(long least, long bound); } - + /** Constructs a new {@link Builder} using the given topic. */ public static Builder newBuilder(TopicName topicName) { return new Builder(topicName); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index 96c2f5ab115a..cdbc14fef3c6 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -264,7 +264,7 @@ private static class SubscriberImpl extends AbstractService { private static final Logger logger = LoggerFactory.getLogger(Subscriber.class); private final SubscriptionName subscriptionName; - private final String subscription; + private final String cachedSubscriptionNameString; private final FlowController.Settings flowControlSettings; private final Duration ackExpirationPadding; private final ScheduledExecutorService executor; @@ -286,7 +286,7 @@ private SubscriberImpl(Builder builder) throws IOException { receiver = builder.receiver; flowControlSettings = builder.flowControlSettings; subscriptionName = builder.subscriptionName; - subscription = subscriptionName.toString(); + cachedSubscriptionNameString = subscriptionName.toString(); ackExpirationPadding = builder.ackExpirationPadding; streamAckDeadlineSeconds = Math.max( @@ -356,7 +356,7 @@ private void startStreamingConnections() { for (int i = 0; i < numChannels; i++) { streamingSubscriberConnections.add( new StreamingSubscriberConnection( - subscription, + cachedSubscriptionNameString, credentials, receiver, ackExpirationPadding, @@ -428,7 +428,7 @@ private void startPollingConnections() { for (int i = 0; i < numChannels; i++) { pollingSubscriberConnections.add( new PollingSubscriberConnection( - subscription, + cachedSubscriptionNameString, credentials, receiver, ackExpirationPadding,