diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java index 32a8a575b579..637a7b532ee8 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java @@ -22,7 +22,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.Service; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.SubscriptionName; @@ -52,7 +51,7 @@ public ListenableFuture receiveMessage(PubsubMessage m }; Subscriber subscriber = null; try { - subscriber = Subscriber.Builder.newBuilder(subscription, receiver).build(); + subscriber = Subscriber.newBuilder(subscription, receiver).build(); subscriber.addListener( new Subscriber.SubscriberListener() { @Override diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java index b5d40d24a23e..d57c16ce11c0 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java @@ -40,7 +40,7 @@ public static void main(String... args) throws Exception { Publisher publisher = null; try { - publisher = Publisher.Builder.newBuilder(topic).build(); + publisher = Publisher.newBuilder(topic).build(); List messages = Arrays.asList("first message", "second message"); List> messageIds = new ArrayList<>(); for (String message : messages) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index 377c93e99483..20e2e291be86 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -83,7 +83,7 @@ * *

  *  Publisher publisher =
- *       Publisher.Builder.newBuilder(MY_TOPIC)
+ *       Publisher.newBuilder(MY_TOPIC)
  *           .setMaxBundleDuration(new Duration(10 * 1000))
  *           .build();
  *  List<ListenableFuture<String>> results = new ArrayList<>();
@@ -122,7 +122,8 @@ public static long getApiMaxRequestBytes() {
 
   private static final Logger logger = LoggerFactory.getLogger(Publisher.class);
 
-  private final String topic;
+  private final TopicName topicName;
+  private final String cachedTopicNameString;
 
   private final BundlingSettings bundlingSettings;
   private final RetrySettings retrySettings;
@@ -149,7 +150,8 @@ public static long getApiMaxRequestBytes() {
   private ScheduledFuture currentAlarmFuture;
 
   private Publisher(Builder builder) throws IOException {
-    topic = builder.topic;
+    topicName = builder.topicName;
+    cachedTopicNameString = topicName.toString();
 
     this.bundlingSettings = builder.bundlingSettings;
     this.retrySettings = builder.retrySettings;
@@ -198,8 +200,8 @@ public void close() throws IOException {
   }
 
   /** Topic which the publisher publishes to. */
-  public String getTopic() {
-    return topic;
+  public TopicName getTopicName() {
+    return topicName;
   }
 
   /**
@@ -333,7 +335,7 @@ private void publishAllOutstanding() {
 
   private void publishOutstandingBundle(final OutstandingBundle outstandingBundle) {
     PublishRequest.Builder publishRequest = PublishRequest.newBuilder();
-    publishRequest.setTopic(topic);
+    publishRequest.setTopic(cachedTopicNameString);
     for (OutstandingPublish outstandingPublish : outstandingBundle.outstandingPublishes) {
       publishRequest.addMessages(outstandingPublish.message);
     }
@@ -528,6 +530,11 @@ interface LongRandom {
     long nextLong(long least, long bound);
   }
 
+  /** Constructs a new {@link Builder} using the given topic. */
+  public static Builder newBuilder(TopicName topicName) {
+    return new Builder(topicName);
+  }
+
   /** A builder of {@link Publisher}s. */
   public static final class Builder {
     static final Duration MIN_TOTAL_TIMEOUT = new Duration(10 * 1000); // 10 seconds
@@ -569,7 +576,7 @@ public long nextLong(long least, long bound) {
             .setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
             .build();
 
-    String topic;
+    TopicName topicName;
 
     // Bundling options
     BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS;
@@ -588,13 +595,8 @@ public long nextLong(long least, long bound) {
 
     ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
 
-    /** Constructs a new {@link Builder} using the given topic. */
-    public static Builder newBuilder(TopicName topic) {
-      return new Builder(topic.toString());
-    }
-
-    Builder(String topic) {
-      this.topic = Preconditions.checkNotNull(topic);
+    private Builder(TopicName topic) {
+      this.topicName = Preconditions.checkNotNull(topic);
     }
 
     /**
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java
index e4f7a4a6636e..cdbc14fef3c6 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java
@@ -32,8 +32,6 @@
 import com.google.common.util.concurrent.Service;
 import com.google.pubsub.v1.SubscriptionName;
 import io.grpc.ManagedChannelBuilder;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeoutException;
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
 import io.grpc.netty.GrpcSslContexts;
@@ -43,9 +41,11 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,7 +88,7 @@
  * }
  *
  * Subscriber subscriber =
- *     Subscriber.Builder.newBuilder(MY_SUBSCRIPTION, receiver)
+ *     Subscriber.newBuilder(MY_SUBSCRIPTION, receiver)
  *         .setMaxBundleAcks(100)
  *         .build();
  *
@@ -123,19 +123,33 @@ private Subscriber(Builder builder) throws IOException {
     impl = new SubscriberImpl(builder);
   }
 
+  /**
+   * Constructs a new {@link Builder}.
+   *
+   * 

Once {@link #build()} is called a gRPC stub will be created for use of the {@link + * Subscriber}. + * + * @param subscription Cloud Pub/Sub subscription to bind the subscriber to + * @param receiver an implementation of {@link MessageReceiver} used to process the received + * messages + */ + public static Builder newBuilder(SubscriptionName subscription, MessageReceiver receiver) { + return new Builder(subscription, receiver); + } + /** Subscription which the subscriber is subscribed to. */ - public String getSubscription() { - return impl.getSubscription(); + public SubscriptionName getSubscriptionName() { + return impl.subscriptionName; } /** Acknowledgement expiration padding. See {@link Builder.setAckExpirationPadding}. */ public Duration getAckExpirationPadding() { - return impl.getAckExpirationPadding(); + return impl.ackExpirationPadding; } /** The flow control settings the Subscriber is configured with. */ public FlowController.Settings getFlowControlSettings() { - return impl.getFlowControlSettings(); + return impl.flowControlSettings; } public void addListener(final SubscriberListener listener, Executor executor) { @@ -249,7 +263,8 @@ public void terminated(State from) {} private static class SubscriberImpl extends AbstractService { private static final Logger logger = LoggerFactory.getLogger(Subscriber.class); - private final String subscription; + private final SubscriptionName subscriptionName; + private final String cachedSubscriptionNameString; private final FlowController.Settings flowControlSettings; private final Duration ackExpirationPadding; private final ScheduledExecutorService executor; @@ -270,7 +285,8 @@ private static class SubscriberImpl extends AbstractService { private SubscriberImpl(Builder builder) throws IOException { receiver = builder.receiver; flowControlSettings = builder.flowControlSettings; - subscription = builder.subscription; + subscriptionName = builder.subscriptionName; + cachedSubscriptionNameString = subscriptionName.toString(); ackExpirationPadding = builder.ackExpirationPadding; streamAckDeadlineSeconds = Math.max( @@ -340,7 +356,7 @@ private void startStreamingConnections() { for (int i = 0; i < numChannels; i++) { streamingSubscriberConnections.add( new StreamingSubscriberConnection( - subscription, + cachedSubscriptionNameString, credentials, receiver, ackExpirationPadding, @@ -412,7 +428,7 @@ private void startPollingConnections() { for (int i = 0; i < numChannels; i++) { pollingSubscriberConnections.add( new PollingSubscriberConnection( - subscription, + cachedSubscriptionNameString, credentials, receiver, ackExpirationPadding, @@ -496,21 +512,6 @@ public void run() { throw new IllegalStateException(e); } } - - /** Subscription which the subscriber is subscribed to. */ - public String getSubscription() { - return subscription; - } - - /** Acknowledgement expiration padding. See {@link Builder.setAckExpirationPadding}. */ - public Duration getAckExpirationPadding() { - return ackExpirationPadding; - } - - /** The flow control settings the Subscriber is configured with. */ - public FlowController.Settings getFlowControlSettings() { - return flowControlSettings; - } } /** Builder of {@link Subscriber Subscribers}. */ @@ -526,7 +527,7 @@ public static final class Builder { * Runtime.getRuntime().availableProcessors()) .build(); - String subscription; + SubscriptionName subscriptionName; Optional credentials = Optional.absent(); MessageReceiver receiver; @@ -539,22 +540,8 @@ public static final class Builder { Optional.absent(); Optional clock = Optional.absent(); - /** - * Constructs a new {@link Builder}. - * - *

Once {@link #build()} is called a gRPC stub will be created for use of the {@link - * Subscriber}. - * - * @param subscription Cloud Pub/Sub subscription to bind the subscriber to - * @param receiver an implementation of {@link MessageReceiver} used to process the received - * messages - */ - public static Builder newBuilder(SubscriptionName subscription, MessageReceiver receiver) { - return new Builder(subscription.toString(), receiver); - } - - Builder(String subscription, MessageReceiver receiver) { - this.subscription = subscription; + Builder(SubscriptionName subscriptionName, MessageReceiver receiver) { + this.subscriptionName = subscriptionName; this.receiver = receiver; } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java index 165079e584b9..828c3769a5db 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java @@ -352,7 +352,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce public void testPublisherGetters() throws Exception { FakeCredentials credentials = new FakeCredentials(); - Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC); + Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC); builder.setChannelBuilder(testChannelBuilder); builder.setCredentials(credentials); builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR); @@ -370,7 +370,7 @@ public void testPublisherGetters() throws Exception { .build()); Publisher publisher = builder.build(); - assertEquals(TEST_TOPIC.toString(), publisher.getTopic()); + assertEquals(TEST_TOPIC, publisher.getTopicName()); assertEquals(10, (long) publisher.getBundlingSettings().getRequestByteThreshold()); assertEquals(new Duration(11), publisher.getBundlingSettings().getDelayThreshold()); assertEquals(12, (long) publisher.getBundlingSettings().getElementCountThreshold()); @@ -384,8 +384,8 @@ public void testPublisherGetters() throws Exception { @Test public void testBuilderParametersAndDefaults() { - Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC); - assertEquals(TEST_TOPIC.toString(), builder.topic); + Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC); + assertEquals(TEST_TOPIC, builder.topicName); assertEquals(Optional.absent(), builder.channelBuilder); assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider); assertFalse(builder.failOnFlowControlLimits); @@ -404,7 +404,7 @@ public void testBuilderParametersAndDefaults() { @Test public void testBuilderInvalidArguments() { - Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC); + Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC); try { builder.setChannelBuilder(null); @@ -601,7 +601,7 @@ public void testBuilderInvalidArguments() { } private Builder getTestPublisherBuilder() { - return Publisher.Builder.newBuilder(TEST_TOPIC) + return Publisher.newBuilder(TEST_TOPIC) .setCredentials(testCredentials) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) .setChannelBuilder(testChannelBuilder) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java index aead67ee7ede..853d9c90bb44 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java @@ -29,7 +29,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.Service.State; import com.google.common.util.concurrent.SettableFuture; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PullResponse; @@ -467,7 +466,7 @@ private void sendMessages(Iterable ackIds) throws InterruptedException { } private Builder getTestSubscriberBuilder(MessageReceiver receiver) { - return Subscriber.Builder.newBuilder(TEST_SUBSCRIPTION, receiver) + return Subscriber.newBuilder(TEST_SUBSCRIPTION, receiver) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) .setCredentials(testCredentials) .setChannelBuilder(testChannelBuilder)