diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index b53b8a497371..e55fb6179fd2 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -221,7 +221,7 @@ public static ListOption pageToken(String pageToken) { */ Future> listTopicsAsync(ListOption... options); - Publisher getPublisher(TopicInfo topic) throws IOException; + Publisher newPublisher(String topic, Publisher.Settings settings) throws PubSubException; /** * Creates a new subscription. diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index e0e03af5c971..a3ac4924f299 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -286,13 +286,13 @@ public Future> listTopicsAsync(ListOption... options) { } @Override - public Publisher getPublisher(TopicInfo topic) throws IOException { - // TODO(pongad): Provide a way to pass in the rest of the options. - String topicName = - PublisherClient.formatTopicName(getOptions().getProjectId(), topic.getName()); - return Publisher.Builder.newBuilder(topicName) - .setCredentials(getOptions().getCredentials()) - .build(); + public Publisher newPublisher(String topic, Publisher.Settings settings) throws PubSubException { + String topicName = PublisherClient.formatTopicName(getOptions().getProjectId(), topic); + try { + return new PublisherImpl(topicName, getOptions(), settings); + } catch (IOException e) { + throw new PubSubException(e, false); + } } @Override diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java index ac8262d1af97..1b58b86294bb 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java @@ -16,13 +16,13 @@ package com.google.cloud.pubsub; +import com.google.api.gax.grpc.ChannelProvider; import com.google.cloud.GrpcServiceOptions; import com.google.cloud.pubsub.spi.DefaultPubSubRpc; import com.google.cloud.pubsub.spi.PubSubRpc; import com.google.cloud.pubsub.spi.PubSubRpcFactory; import com.google.cloud.pubsub.spi.v1.PublisherSettings; import com.google.common.collect.ImmutableSet; - import java.io.IOException; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; @@ -103,6 +103,11 @@ protected ExecutorFactory getExecutorFactory() { return super.getExecutorFactory(); } + @Override + protected ChannelProvider getChannelProvider() { + return super.getChannelProvider(); + } + @Override protected PubSubFactory getDefaultServiceFactory() { return DefaultPubSubFactory.INSTANCE; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java index 9013b94bef43..28ebca802948 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java @@ -18,15 +18,12 @@ import com.google.api.gax.bundling.FlowController; import com.google.api.gax.grpc.BundlingSettings; -import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; +import com.google.auto.value.AutoValue; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; import com.google.pubsub.v1.PubsubMessage; -import io.grpc.ManagedChannelBuilder; -import java.io.IOException; -import java.util.concurrent.ScheduledExecutorService; import org.joda.time.Duration; /** @@ -79,28 +76,6 @@ * */ public interface Publisher { - String PUBSUB_API_ADDRESS = "pubsub.googleapis.com"; - String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub"; - - // API limits. - int MAX_BUNDLE_MESSAGES = 1000; - int MAX_BUNDLE_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) - - // Meaningful defaults. - long DEFAULT_MAX_BUNDLE_MESSAGES = 100L; - long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB - Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms - Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds - Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds - Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds - - BundlingSettings DEFAULT_BUNDLING_SETTINGS = - BundlingSettings.newBuilder() - .setDelayThreshold(DEFAULT_MAX_BUNDLE_DURATION) - .setRequestByteThreshold(DEFAULT_MAX_BUNDLE_BYTES) - .setElementCountThreshold(DEFAULT_MAX_BUNDLE_MESSAGES) - .build(); - /** Topic to which the publisher publishes to. */ String getTopic(); @@ -161,130 +136,84 @@ public interface Publisher { */ void shutdown(); - /** A builder of {@link Publisher}s. */ - final class Builder { - String topic; + @AutoValue + public abstract class Settings { + static final String PUBSUB_API_ADDRESS = "pubsub.googleapis.com"; + static final String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub"; - // Bundling options - BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS; + // API limits. + static final int MAX_BUNDLE_MESSAGES = 1000; + static final int MAX_BUNDLE_BYTES = + 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) - // Client-side flow control options - FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT; - boolean failOnFlowControlLimits = false; + // Meaningful defaults. + static final long DEFAULT_MAX_BUNDLE_MESSAGES = 100L; + static final long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB + static final Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms + static final Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds + static final Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds + static final Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds - // Send bundle deadline - Duration sendBundleDeadline = MIN_SEND_BUNDLE_DURATION; + static final BundlingSettings DEFAULT_BUNDLING_SETTINGS = + BundlingSettings.newBuilder() + .setDelayThreshold(DEFAULT_MAX_BUNDLE_DURATION) + .setRequestByteThreshold(DEFAULT_MAX_BUNDLE_BYTES) + .setElementCountThreshold(DEFAULT_MAX_BUNDLE_MESSAGES) + .build(); - // RPC options - Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT; + public static Settings DEFAULT = newBuilder().build(); - // Channels and credentials - Optional userCredentials = Optional.absent(); - Optional>> channelBuilder = - Optional.absent(); + public abstract BundlingSettings getBundlingSettings(); - Optional executor = Optional.absent(); + public abstract FlowController.Settings getFlowControlSettings(); - /** Constructs a new {@link Builder} using the given topic. */ - public static Builder newBuilder(String topic) { - return new Builder(topic); - } + public abstract boolean getFailOnFlowControlLimits(); - Builder(String topic) { - this.topic = Preconditions.checkNotNull(topic); - } + abstract Duration getSendBundleDeadline(); - /** - * Credentials to authenticate with. - * - *

Must be properly scoped for accessing Cloud Pub/Sub APIs. - */ - public Builder setCredentials(Credentials userCredentials) { - this.userCredentials = Optional.of(Preconditions.checkNotNull(userCredentials)); - return this; - } + abstract Duration getRequestTimeout(); - /** - * ManagedChannelBuilder to use to create Channels. - * - *

Must point at Cloud Pub/Sub endpoint. - */ - public Builder setChannelBuilder( - ManagedChannelBuilder> channelBuilder) { - this.channelBuilder = - Optional.>>of( - Preconditions.checkNotNull(channelBuilder)); - return this; + public static Builder newBuilder() { + return new AutoValue_Publisher_Settings.Builder() + .setFlowControlSettings(FlowController.Settings.DEFAULT) + .setFailOnFlowControlLimits(false) + .setSendBundleDeadline(MIN_SEND_BUNDLE_DURATION) + .setRequestTimeout(DEFAULT_REQUEST_TIMEOUT) + .setBundlingSettings(DEFAULT_BUNDLING_SETTINGS); } - // Bundling options - public Builder setBundlingSettings(BundlingSettings bundlingSettings) { - Preconditions.checkNotNull(bundlingSettings); - Preconditions.checkNotNull(bundlingSettings.getElementCountThreshold()); - Preconditions.checkArgument(bundlingSettings.getElementCountThreshold() > 0); - Preconditions.checkNotNull(bundlingSettings.getRequestByteThreshold()); - Preconditions.checkArgument(bundlingSettings.getRequestByteThreshold() > 0); - Preconditions.checkNotNull(bundlingSettings.getDelayThreshold()); - Preconditions.checkArgument(bundlingSettings.getDelayThreshold().getMillis() > 0); + @AutoValue.Builder + abstract static class Builder { + public abstract Builder setBundlingSettings(BundlingSettings value); - Preconditions.checkArgument( - bundlingSettings.getElementCountLimit() == null, - "elementCountLimit option not honored by current implementation"); - Preconditions.checkArgument( - bundlingSettings.getRequestByteLimit() == null, - "requestByteLimit option not honored by current implementation"); - Preconditions.checkArgument( - bundlingSettings.getBlockingCallCountThreshold() == null, - "blockingCallCountThreshold option not honored by current implementation"); + public abstract Builder setFlowControlSettings(FlowController.Settings value); - this.bundlingSettings = bundlingSettings; - return this; - } + public abstract Builder setFailOnFlowControlLimits(boolean value); - // Flow control options + abstract Builder setSendBundleDeadline(Duration value); - /** Sets the flow control settings. */ - public Builder setFlowControlSettings(FlowController.Settings flowControlSettings) { - this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings); - return this; - } - - /** - * Whether to fail publish when reaching any of the flow control limits, with either a {@link - * RequestByteMaxOutstandingReachedException} or {@link - * ElementCountMaxOutstandingReachedException} as appropriate. - * - *

If set to false, then publish operations will block the current thread until the - * outstanding requests go under the limits. - */ - public Builder setFailOnFlowControlLimits(boolean fail) { - failOnFlowControlLimits = fail; - return this; - } + abstract Builder setRequestTimeout(Duration value); - /** Maximum time to attempt sending (and retrying) a bundle of messages before giving up. */ - public Builder setSendBundleDeadline(Duration deadline) { - Preconditions.checkArgument(deadline.compareTo(MIN_SEND_BUNDLE_DURATION) >= 0); - sendBundleDeadline = deadline; - return this; - } + abstract Settings autoBuild(); - // Runtime options - /** Time to wait for a publish call to return from the server. */ - public Builder setRequestTimeout(Duration timeout) { - Preconditions.checkArgument(timeout.compareTo(MIN_REQUEST_TIMEOUT) >= 0); - requestTimeout = timeout; - return this; - } - - /** Gives the ability to set a custom executor to be used by the library. */ - public Builder setExecutor(ScheduledExecutorService executor) { - this.executor = Optional.of(Preconditions.checkNotNull(executor)); - return this; - } + public Settings build() { + Settings settings = autoBuild(); + Preconditions.checkArgument( + settings.getBundlingSettings().getElementCountLimit() == null, + "elementCountLimit option not honored by current implementation"); + Preconditions.checkArgument( + settings.getBundlingSettings().getRequestByteLimit() == null, + "requestByteLimit option not honored by current implementation"); + Preconditions.checkArgument( + settings.getBundlingSettings().getBlockingCallCountThreshold() == null, + "blockingCallCountThreshold option not honored by current implementation"); - public Publisher build() throws IOException { - return new PublisherImpl(this); + Preconditions.checkArgument( + settings.getRequestTimeout().compareTo(MIN_REQUEST_TIMEOUT) >= 0); + Preconditions.checkArgument( + settings.getSendBundleDeadline().compareTo(MIN_SEND_BUNDLE_DURATION) >= 0); + return settings; + } } } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java index d9e375b2dedd..658fb6028861 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java @@ -17,7 +17,6 @@ package com.google.cloud.pubsub; import com.google.api.gax.bundling.FlowController; -import com.google.auth.oauth2.GoogleCredentials; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Ints; @@ -25,24 +24,18 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc; import com.google.pubsub.v1.PubsubMessage; import io.grpc.CallCredentials; -import io.grpc.Channel; +import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.auth.MoreCallCredentials; -import io.grpc.netty.GrpcSslContexts; -import io.grpc.netty.NegotiationType; -import io.grpc.netty.NettyChannelBuilder; import java.io.IOException; -import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -63,6 +56,7 @@ final class PublisherImpl implements Publisher { private static final Logger logger = LoggerFactory.getLogger(PublisherImpl.class); private final String topic; + private final PubSubOptions psOptions; private final long maxBundleMessages; private final long maxBundleBytes; @@ -79,7 +73,7 @@ final class PublisherImpl implements Publisher { private final AtomicBoolean activeAlarm; private final FlowController flowController; - private final Channel[] channels; + private final ManagedChannel[] channels; private final AtomicLong channelIndex; private final CallCredentials credentials; private final Duration requestTimeout; @@ -90,55 +84,42 @@ final class PublisherImpl implements Publisher { private final Duration sendBundleDeadline; private ScheduledFuture currentAlarmFuture; - PublisherImpl(Builder builder) throws IOException { - topic = builder.topic; - - maxBundleMessages = builder.bundlingSettings.getElementCountThreshold(); - maxBundleBytes = builder.bundlingSettings.getRequestByteThreshold(); - maxBundleDuration = builder.bundlingSettings.getDelayThreshold(); + PublisherImpl(String topic, PubSubOptions psOptions, Settings settings) throws IOException { + this.topic = topic; + this.psOptions = psOptions; + maxBundleMessages = settings.getBundlingSettings().getElementCountThreshold(); + maxBundleBytes = settings.getBundlingSettings().getRequestByteThreshold(); + maxBundleDuration = settings.getBundlingSettings().getDelayThreshold(); hasBundlingBytes = maxBundleBytes > 0; - flowControlSettings = builder.flowControlSettings; - failOnFlowControlLimits = builder.failOnFlowControlLimits; + flowControlSettings = settings.getFlowControlSettings(); + failOnFlowControlLimits = settings.getFailOnFlowControlLimits(); this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits); - sendBundleDeadline = builder.sendBundleDeadline; + sendBundleDeadline = settings.getSendBundleDeadline(); - requestTimeout = builder.requestTimeout; + requestTimeout = settings.getRequestTimeout(); messagesBundle = new LinkedList<>(); messagesBundleLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); - int numCores = Math.max(1, Runtime.getRuntime().availableProcessors()); - executor = - builder.executor.isPresent() - ? builder.executor.get() - : Executors.newScheduledThreadPool( - numCores * DEFAULT_MIN_THREAD_POOL_SIZE, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("cloud-pubsub-publisher-thread-%d") - .build()); - channels = new Channel[numCores]; - channelIndex = new AtomicLong(0); - for (int i = 0; i < numCores; i++) { - channels[i] = - builder.channelBuilder.isPresent() - ? builder.channelBuilder.get().build() - : NettyChannelBuilder.forAddress(PUBSUB_API_ADDRESS, 443) - .negotiationType(NegotiationType.TLS) - .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) - .executor(executor) - .build(); - } - credentials = - MoreCallCredentials.from( - builder.userCredentials.isPresent() - ? builder.userCredentials.get() - : GoogleCredentials.getApplicationDefault() - .createScoped(Collections.singletonList(PUBSUB_API_SCOPE))); shutdown = new AtomicBoolean(false); messagesWaiter = new MessagesWaiter(); + + int numCores = Runtime.getRuntime().availableProcessors(); + executor = psOptions.getExecutorFactory().get(); + + channels = new ManagedChannel[numCores]; + for (int i = 0; i < numCores; i++) { + if (psOptions.getChannelProvider().needsExecutor()) { + channels[i] = psOptions.getChannelProvider().getChannel(executor); + } else { + channels[i] = psOptions.getChannelProvider().getChannel(); + } + } + channelIndex = new AtomicLong(0); + + credentials = MoreCallCredentials.from(psOptions.getCredentials()); } @Override @@ -421,6 +402,13 @@ public void shutdown() { } publishAllOustanding(); messagesWaiter.waitNoMessages(); + + if (psOptions.getChannelProvider().shouldAutoClose()) { + for (ManagedChannel channel : channels) { + channel.shutdown(); + } + } + psOptions.getExecutorFactory().release(executor); } private static long computeNextBackoffDelayMs(OutstandingBundle outstandingBundle) { diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java index bd906b33ca74..97b649fee54f 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java @@ -25,13 +25,15 @@ import com.google.api.gax.bundling.FlowController; import com.google.api.gax.grpc.BundlingSettings; -import com.google.cloud.pubsub.Publisher.Builder; +import com.google.api.gax.grpc.ChannelProvider; +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.inprocess.InProcessChannelBuilder; @@ -39,6 +41,7 @@ import io.grpc.internal.ServerImpl; import io.grpc.stub.StreamObserver; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import org.joda.time.Duration; @@ -57,6 +60,7 @@ @RunWith(JUnit4.class) public class PublisherImplTest { + private static final String TEST_PROJECT_NAME = "test-project"; private static final String TEST_TOPIC = "projects/test-project/topics/test-topic"; private static InProcessChannelBuilder testChannelBuilder; @@ -100,15 +104,18 @@ public static void tearDownClass() throws Exception { @Test public void testPublishByDuration() throws Exception { Publisher publisher = - getTestPublisherBuilder() - // To demonstrate that reaching duration will trigger publish - .setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setDelayThreshold(Duration.standardSeconds(5)) - .setElementCountThreshold(10) - .build()) - .build(); + new PublisherImpl( + TEST_TOPIC, + getTestOptions(fakeExecutor), + Publisher.Settings.newBuilder() + // To demonstrate that reaching duration will trigger publish + .setBundlingSettings( + Publisher.Settings.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setDelayThreshold(Duration.standardSeconds(5)) + .setElementCountThreshold(10) + .build()) + .build()); testPublisherServiceImpl.addPublishResponse( PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")); @@ -132,14 +139,17 @@ public void testPublishByDuration() throws Exception { @Test public void testPublishByNumBundledMessages() throws Exception { Publisher publisher = - getTestPublisherBuilder() - .setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setElementCountThreshold(2) - .setDelayThreshold(Duration.standardSeconds(100)) - .build()) - .build(); + new PublisherImpl( + TEST_TOPIC, + getTestOptions(fakeExecutor), + Publisher.Settings.newBuilder() + .setBundlingSettings( + Publisher.Settings.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setElementCountThreshold(2) + .setDelayThreshold(Duration.standardSeconds(100)) + .build()) + .build()); testPublisherServiceImpl .addPublishResponse(PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")) @@ -171,14 +181,17 @@ public void testPublishByNumBundledMessages() throws Exception { @Test public void testSinglePublishByNumBytes() throws Exception { Publisher publisher = - getTestPublisherBuilder() - .setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setElementCountThreshold(2) - .setDelayThreshold(Duration.standardSeconds(100)) - .build()) - .build(); + new PublisherImpl( + TEST_TOPIC, + getTestOptions(fakeExecutor), + Publisher.Settings.newBuilder() + .setBundlingSettings( + Publisher.Settings.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setElementCountThreshold(2) + .setDelayThreshold(Duration.standardSeconds(100)) + .build()) + .build()); testPublisherServiceImpl .addPublishResponse(PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")) @@ -205,15 +218,18 @@ public void testSinglePublishByNumBytes() throws Exception { @Test public void testPublishMixedSizeAndDuration() throws Exception { Publisher publisher = - getTestPublisherBuilder() - // To demonstrate that reaching duration will trigger publish - .setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setElementCountThreshold(2) - .setDelayThreshold(Duration.standardSeconds(5)) - .build()) - .build(); + new PublisherImpl( + TEST_TOPIC, + getTestOptions(fakeExecutor), + Publisher.Settings.newBuilder() + // To demonstrate that reaching duration will trigger publish + .setBundlingSettings( + Publisher.Settings.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setElementCountThreshold(2) + .setDelayThreshold(Duration.standardSeconds(5)) + .build()) + .build()); testPublisherServiceImpl.addPublishResponse( PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")); @@ -253,15 +269,17 @@ private ListenableFuture sendTestMessage(Publisher publisher, String dat @Test public void testPublishFailureRetries() throws Exception { Publisher publisher = - getTestPublisherBuilder() - .setExecutor(Executors.newSingleThreadScheduledExecutor()) - .setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setElementCountThreshold(1) - .setDelayThreshold(Duration.standardSeconds(5)) - .build()) - .build(); // To demonstrate that reaching duration will trigger publish + new PublisherImpl( + TEST_TOPIC, + getTestOptions(Executors.newSingleThreadScheduledExecutor()), + Publisher.Settings.newBuilder() + .setBundlingSettings( + Publisher.Settings.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setElementCountThreshold(1) + .setDelayThreshold(Duration.standardSeconds(5)) + .build()) + .build()); // To demonstrate that reaching duration will trigger publish ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); @@ -277,16 +295,18 @@ public void testPublishFailureRetries() throws Exception { @Test(expected = Throwable.class) public void testPublishFailureRetries_exceededsRetryDuration() throws Exception { Publisher publisher = - getTestPublisherBuilder() - .setExecutor(Executors.newSingleThreadScheduledExecutor()) - .setSendBundleDeadline(Duration.standardSeconds(10)) - .setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setElementCountThreshold(1) - .setDelayThreshold(Duration.standardSeconds(5)) - .build()) - .build(); // To demonstrate that reaching duration will trigger publish + new PublisherImpl( + TEST_TOPIC, + getTestOptions(Executors.newSingleThreadScheduledExecutor()), + Publisher.Settings.newBuilder() + .setSendBundleDeadline(Duration.standardSeconds(10)) + .setBundlingSettings( + Publisher.Settings.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setElementCountThreshold(1) + .setDelayThreshold(Duration.standardSeconds(5)) + .build()) + .build()); // To demonstrate that reaching duration will trigger publish ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); @@ -306,16 +326,18 @@ public void testPublishFailureRetries_exceededsRetryDuration() throws Exception @Test(expected = ExecutionException.class) public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exception { Publisher publisher = - getTestPublisherBuilder() - .setExecutor(Executors.newSingleThreadScheduledExecutor()) - .setSendBundleDeadline(Duration.standardSeconds(10)) - .setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setElementCountThreshold(1) - .setDelayThreshold(Duration.standardSeconds(5)) - .build()) - .build(); // To demonstrate that reaching duration will trigger publish + new PublisherImpl( + TEST_TOPIC, + getTestOptions(Executors.newSingleThreadScheduledExecutor()), + Publisher.Settings.newBuilder() + .setSendBundleDeadline(Duration.standardSeconds(10)) + .setBundlingSettings( + Publisher.Settings.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setElementCountThreshold(1) + .setDelayThreshold(Duration.standardSeconds(5)) + .build()) + .build()); // To demonstrate that reaching duration will trigger publish ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); @@ -331,13 +353,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce @Test public void testPublisherGetters() throws Exception { - FakeCredentials credentials = new FakeCredentials(); - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - - Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC); - builder.setChannelBuilder(testChannelBuilder); - builder.setCredentials(credentials); - builder.setExecutor(executor); + Publisher.Settings.Builder builder = Publisher.Settings.newBuilder(); builder.setFailOnFlowControlLimits(true); builder.setBundlingSettings( BundlingSettings.newBuilder() @@ -352,7 +368,8 @@ public void testPublisherGetters() throws Exception { .build()); builder.setRequestTimeout(new Duration(15)); builder.setSendBundleDeadline(new Duration(16000)); - Publisher publisher = builder.build(); + Publisher publisher = + new PublisherImpl(TEST_TOPIC, getTestOptions(fakeExecutor), builder.build()); assertEquals(TEST_TOPIC, publisher.getTopic()); assertEquals(10, publisher.getMaxBundleBytes()); @@ -365,193 +382,102 @@ public void testPublisherGetters() throws Exception { @Test public void testBuilderParametersAndDefaults() { - Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC); - assertEquals(TEST_TOPIC, builder.topic); - assertEquals(Optional.absent(), builder.channelBuilder); - assertEquals(Optional.absent(), builder.executor); - assertFalse(builder.failOnFlowControlLimits); + Publisher.Settings settings = Publisher.Settings.newBuilder().autoBuild(); + assertFalse(settings.getFailOnFlowControlLimits()); assertEquals( - Publisher.DEFAULT_MAX_BUNDLE_BYTES, - builder.bundlingSettings.getRequestByteThreshold().longValue()); + Publisher.Settings.DEFAULT_MAX_BUNDLE_BYTES, + settings.getBundlingSettings().getRequestByteThreshold().longValue()); assertEquals( - Publisher.DEFAULT_MAX_BUNDLE_DURATION, builder.bundlingSettings.getDelayThreshold()); + Publisher.Settings.DEFAULT_MAX_BUNDLE_DURATION, + settings.getBundlingSettings().getDelayThreshold()); assertEquals( - Publisher.DEFAULT_MAX_BUNDLE_MESSAGES, - builder.bundlingSettings.getElementCountThreshold().longValue()); - assertEquals(FlowController.Settings.DEFAULT, builder.flowControlSettings); - assertEquals(Publisher.DEFAULT_REQUEST_TIMEOUT, builder.requestTimeout); - assertEquals(Publisher.MIN_SEND_BUNDLE_DURATION, builder.sendBundleDeadline); - assertEquals(Optional.absent(), builder.userCredentials); + Publisher.Settings.DEFAULT_MAX_BUNDLE_MESSAGES, + settings.getBundlingSettings().getElementCountThreshold().longValue()); + assertEquals(FlowController.Settings.DEFAULT, settings.getFlowControlSettings()); + assertEquals(Publisher.Settings.DEFAULT_REQUEST_TIMEOUT, settings.getRequestTimeout()); + assertEquals(Publisher.Settings.MIN_SEND_BUNDLE_DURATION, settings.getSendBundleDeadline()); } @Test public void testBuilderInvalidArguments() { - Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC); - - try { - builder.setChannelBuilder(null); - fail("Should have thrown an IllegalArgumentException"); - } catch (NullPointerException expected) { - // Expected - } - - try { - builder.setCredentials(null); - fail("Should have thrown an IllegalArgumentException"); - } catch (NullPointerException expected) { - // Expected - } + Publisher.Settings.newBuilder().build(); + Publisher.Settings.newBuilder() + .setRequestTimeout(Publisher.Settings.MIN_REQUEST_TIMEOUT) + .build(); try { - builder.setExecutor(null); - fail("Should have thrown an IllegalArgumentException"); - } catch (NullPointerException expected) { - // Expected - } - try { - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setRequestByteThreshold((Long) null) - .build()); - fail("Should have thrown an NullPointerException"); - } catch (NullPointerException expected) { - // Expected - } - try { - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold(0).build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - try { - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold(-1).build()); + Publisher.Settings.newBuilder() + .setRequestTimeout(Publisher.Settings.MIN_REQUEST_TIMEOUT.minus(1)) + .build(); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected } - - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setDelayThreshold(new Duration(1)).build()); - try { - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setDelayThreshold(null).build()); - fail("Should have thrown an NullPointerException"); - } catch (NullPointerException expected) { - // Expected - } + Publisher.Settings.newBuilder() + .setSendBundleDeadline(Publisher.Settings.MIN_SEND_BUNDLE_DURATION) + .build(); try { - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setDelayThreshold(new Duration(-1)) - .build()); + Publisher.Settings.newBuilder() + .setSendBundleDeadline(Publisher.Settings.MIN_SEND_BUNDLE_DURATION.minus(1)) + .build(); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected } + } - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(1).build()); - try { - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setElementCountThreshold((Long) null) - .build()); - fail("Should have thrown an NullPointerException"); - } catch (NullPointerException expected) { - // Expected - } - try { - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(0).build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - try { - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(-1).build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } + private PubSubOptions getTestOptions(final ScheduledExecutorService executor) { + return new TestPubSubOptions( + PubSubOptions.newBuilder() + .setProjectId(TEST_PROJECT_NAME) + .setCredentials(testCredentials) + .setExecutorFactory( + new ExecutorFactory() { + @Override + public ScheduledExecutorService get() { + return executor; + } + + @Override + public void release(ScheduledExecutorService executor) { + // do nothing + } + }), + new ChannelProvider() { + @Override + public boolean shouldAutoClose() { + return true; + } + + @Override + public boolean needsExecutor() { + return false; + } + + @Override + public ManagedChannel getChannel() { + return testChannelBuilder.build(); + } + + @Override + public ManagedChannel getChannel(Executor executor) { + throw new IllegalStateException( + "getChannel(Executor) called when needsExecutor() is false."); + } + }); + } - builder.setFlowControlSettings( - FlowController.Settings.DEFAULT - .toBuilder() - .setMaxOutstandingRequestBytes(Optional.of(1)) - .build()); - try { - builder.setFlowControlSettings( - FlowController.Settings.DEFAULT - .toBuilder() - .setMaxOutstandingRequestBytes(Optional.of(0)) - .build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - try { - builder.setFlowControlSettings( - FlowController.Settings.DEFAULT - .toBuilder() - .setMaxOutstandingRequestBytes(Optional.of(-1)) - .build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } + private static class TestPubSubOptions extends PubSubOptions { + private final ChannelProvider channelProvider; - builder.setFlowControlSettings( - FlowController.Settings.DEFAULT - .toBuilder() - .setMaxOutstandingElementCount(Optional.of(1)) - .build()); - try { - builder.setFlowControlSettings( - FlowController.Settings.DEFAULT - .toBuilder() - .setMaxOutstandingElementCount(Optional.of(0)) - .build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - try { - builder.setFlowControlSettings( - FlowController.Settings.DEFAULT - .toBuilder() - .setMaxOutstandingElementCount(Optional.of(-1)) - .build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected + TestPubSubOptions(PubSubOptions.Builder b, ChannelProvider c) { + super(b); + this.channelProvider = c; } - builder.setRequestTimeout(Publisher.MIN_REQUEST_TIMEOUT); - try { - builder.setRequestTimeout(Publisher.MIN_REQUEST_TIMEOUT.minus(1)); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected + @Override + protected ChannelProvider getChannelProvider() { + return channelProvider; } - builder.setSendBundleDeadline(Publisher.MIN_SEND_BUNDLE_DURATION); - try { - builder.setSendBundleDeadline(Publisher.MIN_SEND_BUNDLE_DURATION.minus(1)); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - } - - private Builder getTestPublisherBuilder() { - return Publisher.Builder.newBuilder(TEST_TOPIC) - .setCredentials(testCredentials) - .setExecutor(fakeExecutor) - .setChannelBuilder(testChannelBuilder); } }