From 2beb34efab4e5a45dca90b4d5755bf10000aeccb Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Mon, 9 Jan 2017 15:40:53 +1100 Subject: [PATCH 1/2] add Publisher.Settings This class is used to configure Publisher. User-visible options are made public. Options that should be automatically populated, like user credentials, are left package-private for now. --- .../java/com/google/cloud/pubsub/PubSub.java | 2 +- .../com/google/cloud/pubsub/PubSubImpl.java | 14 +- .../com/google/cloud/pubsub/Publisher.java | 219 +++++------ .../google/cloud/pubsub/PublisherImpl.java | 35 +- .../cloud/pubsub/PublisherImplTest.java | 343 ++++++------------ 5 files changed, 226 insertions(+), 387 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index b53b8a497371..e55fb6179fd2 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -221,7 +221,7 @@ public static ListOption pageToken(String pageToken) { */ Future> listTopicsAsync(ListOption... options); - Publisher getPublisher(TopicInfo topic) throws IOException; + Publisher newPublisher(String topic, Publisher.Settings settings) throws PubSubException; /** * Creates a new subscription. diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index e0e03af5c971..45d309012d8d 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -286,13 +286,13 @@ public Future> listTopicsAsync(ListOption... options) { } @Override - public Publisher getPublisher(TopicInfo topic) throws IOException { - // TODO(pongad): Provide a way to pass in the rest of the options. - String topicName = - PublisherClient.formatTopicName(getOptions().getProjectId(), topic.getName()); - return Publisher.Builder.newBuilder(topicName) - .setCredentials(getOptions().getCredentials()) - .build(); + public Publisher newPublisher(String topic, Publisher.Settings settings) throws PubSubException { + String topicName = PublisherClient.formatTopicName(getOptions().getProjectId(), topic); + try { + return new PublisherImpl(topicName, settings); + } catch (IOException e) { + throw new PubSubException(e, false); + } } @Override diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java index 9013b94bef43..b6385d73b127 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java @@ -20,12 +20,12 @@ import com.google.api.gax.grpc.BundlingSettings; import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; +import com.google.auto.value.AutoValue; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; import com.google.pubsub.v1.PubsubMessage; import io.grpc.ManagedChannelBuilder; -import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; import org.joda.time.Duration; @@ -79,28 +79,6 @@ * */ public interface Publisher { - String PUBSUB_API_ADDRESS = "pubsub.googleapis.com"; - String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub"; - - // API limits. - int MAX_BUNDLE_MESSAGES = 1000; - int MAX_BUNDLE_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) - - // Meaningful defaults. - long DEFAULT_MAX_BUNDLE_MESSAGES = 100L; - long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB - Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms - Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds - Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds - Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds - - BundlingSettings DEFAULT_BUNDLING_SETTINGS = - BundlingSettings.newBuilder() - .setDelayThreshold(DEFAULT_MAX_BUNDLE_DURATION) - .setRequestByteThreshold(DEFAULT_MAX_BUNDLE_BYTES) - .setElementCountThreshold(DEFAULT_MAX_BUNDLE_MESSAGES) - .build(); - /** Topic to which the publisher publishes to. */ String getTopic(); @@ -161,130 +139,115 @@ public interface Publisher { */ void shutdown(); - /** A builder of {@link Publisher}s. */ - final class Builder { - String topic; + @AutoValue + public abstract class Settings { + static final String PUBSUB_API_ADDRESS = "pubsub.googleapis.com"; + static final String PUBSUB_API_SCOPE = "https://www.googleapis.com/auth/pubsub"; - // Bundling options - BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS; + // API limits. + static final int MAX_BUNDLE_MESSAGES = 1000; + static final int MAX_BUNDLE_BYTES = + 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) - // Client-side flow control options - FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT; - boolean failOnFlowControlLimits = false; + // Meaningful defaults. + static final long DEFAULT_MAX_BUNDLE_MESSAGES = 100L; + static final long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB + static final Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms + static final Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds + static final Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds + static final Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds - // Send bundle deadline - Duration sendBundleDeadline = MIN_SEND_BUNDLE_DURATION; + static final BundlingSettings DEFAULT_BUNDLING_SETTINGS = + BundlingSettings.newBuilder() + .setDelayThreshold(DEFAULT_MAX_BUNDLE_DURATION) + .setRequestByteThreshold(DEFAULT_MAX_BUNDLE_BYTES) + .setElementCountThreshold(DEFAULT_MAX_BUNDLE_MESSAGES) + .build(); - // RPC options - Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT; + public static Settings DEFAULT = newBuilder().build(); - // Channels and credentials - Optional userCredentials = Optional.absent(); - Optional>> channelBuilder = - Optional.absent(); + public abstract BundlingSettings getBundlingSettings(); - Optional executor = Optional.absent(); + public abstract FlowController.Settings getFlowControlSettings(); - /** Constructs a new {@link Builder} using the given topic. */ - public static Builder newBuilder(String topic) { - return new Builder(topic); - } + public abstract boolean getFailOnFlowControlLimits(); - Builder(String topic) { - this.topic = Preconditions.checkNotNull(topic); - } + abstract Duration getSendBundleDeadline(); - /** - * Credentials to authenticate with. - * - *

Must be properly scoped for accessing Cloud Pub/Sub APIs. - */ - public Builder setCredentials(Credentials userCredentials) { - this.userCredentials = Optional.of(Preconditions.checkNotNull(userCredentials)); - return this; - } + abstract Duration getRequestTimeout(); - /** - * ManagedChannelBuilder to use to create Channels. - * - *

Must point at Cloud Pub/Sub endpoint. - */ - public Builder setChannelBuilder( - ManagedChannelBuilder> channelBuilder) { - this.channelBuilder = - Optional.>>of( - Preconditions.checkNotNull(channelBuilder)); - return this; - } + abstract Optional getUserCredentials(); - // Bundling options - public Builder setBundlingSettings(BundlingSettings bundlingSettings) { - Preconditions.checkNotNull(bundlingSettings); - Preconditions.checkNotNull(bundlingSettings.getElementCountThreshold()); - Preconditions.checkArgument(bundlingSettings.getElementCountThreshold() > 0); - Preconditions.checkNotNull(bundlingSettings.getRequestByteThreshold()); - Preconditions.checkArgument(bundlingSettings.getRequestByteThreshold() > 0); - Preconditions.checkNotNull(bundlingSettings.getDelayThreshold()); - Preconditions.checkArgument(bundlingSettings.getDelayThreshold().getMillis() > 0); - - Preconditions.checkArgument( - bundlingSettings.getElementCountLimit() == null, - "elementCountLimit option not honored by current implementation"); - Preconditions.checkArgument( - bundlingSettings.getRequestByteLimit() == null, - "requestByteLimit option not honored by current implementation"); - Preconditions.checkArgument( - bundlingSettings.getBlockingCallCountThreshold() == null, - "blockingCallCountThreshold option not honored by current implementation"); - - this.bundlingSettings = bundlingSettings; - return this; - } + abstract Optional>> + getChannelBuilder(); - // Flow control options + abstract Optional getExecutor(); - /** Sets the flow control settings. */ - public Builder setFlowControlSettings(FlowController.Settings flowControlSettings) { - this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings); - return this; + public static Builder newBuilder() { + return new AutoValue_Publisher_Settings.Builder() + .setFlowControlSettings(FlowController.Settings.DEFAULT) + .setFailOnFlowControlLimits(false) + .setSendBundleDeadline(MIN_SEND_BUNDLE_DURATION) + .setRequestTimeout(DEFAULT_REQUEST_TIMEOUT) + .setUserCredentials(Optional.absent()) + .setChannelBuilder( + Optional.>>absent()) + .setExecutor(Optional.absent()) + .setBundlingSettings(DEFAULT_BUNDLING_SETTINGS); } - /** - * Whether to fail publish when reaching any of the flow control limits, with either a {@link - * RequestByteMaxOutstandingReachedException} or {@link - * ElementCountMaxOutstandingReachedException} as appropriate. - * - *

If set to false, then publish operations will block the current thread until the - * outstanding requests go under the limits. - */ - public Builder setFailOnFlowControlLimits(boolean fail) { - failOnFlowControlLimits = fail; - return this; - } + @AutoValue.Builder + abstract static class Builder { + public abstract Builder setBundlingSettings(BundlingSettings value); - /** Maximum time to attempt sending (and retrying) a bundle of messages before giving up. */ - public Builder setSendBundleDeadline(Duration deadline) { - Preconditions.checkArgument(deadline.compareTo(MIN_SEND_BUNDLE_DURATION) >= 0); - sendBundleDeadline = deadline; - return this; - } + public abstract Builder setFlowControlSettings(FlowController.Settings value); - // Runtime options - /** Time to wait for a publish call to return from the server. */ - public Builder setRequestTimeout(Duration timeout) { - Preconditions.checkArgument(timeout.compareTo(MIN_REQUEST_TIMEOUT) >= 0); - requestTimeout = timeout; - return this; - } + public abstract Builder setFailOnFlowControlLimits(boolean value); - /** Gives the ability to set a custom executor to be used by the library. */ - public Builder setExecutor(ScheduledExecutorService executor) { - this.executor = Optional.of(Preconditions.checkNotNull(executor)); - return this; - } + abstract Builder setSendBundleDeadline(Duration value); + + abstract Builder setRequestTimeout(Duration value); + + abstract Builder setUserCredentials(Optional value); + + Builder setUserCredentials(Credentials value) { + return setUserCredentials(Optional.of(value)); + } + + abstract Builder setChannelBuilder( + Optional>> value); + + Builder setChannelBuilder(ManagedChannelBuilder> value) { + return setChannelBuilder( + Optional.>>of(value)); + } + + abstract Builder setExecutor(Optional value); + + Builder setExecutor(ScheduledExecutorService value) { + return setExecutor(Optional.of(value)); + } + + abstract Settings autoBuild(); + + public Settings build() { + Settings settings = autoBuild(); + Preconditions.checkArgument( + settings.getBundlingSettings().getElementCountLimit() == null, + "elementCountLimit option not honored by current implementation"); + Preconditions.checkArgument( + settings.getBundlingSettings().getRequestByteLimit() == null, + "requestByteLimit option not honored by current implementation"); + Preconditions.checkArgument( + settings.getBundlingSettings().getBlockingCallCountThreshold() == null, + "blockingCallCountThreshold option not honored by current implementation"); - public Publisher build() throws IOException { - return new PublisherImpl(this); + Preconditions.checkArgument( + settings.getRequestTimeout().compareTo(MIN_REQUEST_TIMEOUT) >= 0); + Preconditions.checkArgument( + settings.getSendBundleDeadline().compareTo(MIN_SEND_BUNDLE_DURATION) >= 0); + return settings; + } } } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java index d9e375b2dedd..78eb097d4ba5 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java @@ -90,29 +90,28 @@ final class PublisherImpl implements Publisher { private final Duration sendBundleDeadline; private ScheduledFuture currentAlarmFuture; - PublisherImpl(Builder builder) throws IOException { - topic = builder.topic; - - maxBundleMessages = builder.bundlingSettings.getElementCountThreshold(); - maxBundleBytes = builder.bundlingSettings.getRequestByteThreshold(); - maxBundleDuration = builder.bundlingSettings.getDelayThreshold(); + PublisherImpl(String topic, Settings settings) throws IOException { + this.topic = topic; + maxBundleMessages = settings.getBundlingSettings().getElementCountThreshold(); + maxBundleBytes = settings.getBundlingSettings().getRequestByteThreshold(); + maxBundleDuration = settings.getBundlingSettings().getDelayThreshold(); hasBundlingBytes = maxBundleBytes > 0; - flowControlSettings = builder.flowControlSettings; - failOnFlowControlLimits = builder.failOnFlowControlLimits; + flowControlSettings = settings.getFlowControlSettings(); + failOnFlowControlLimits = settings.getFailOnFlowControlLimits(); this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits); - sendBundleDeadline = builder.sendBundleDeadline; + sendBundleDeadline = settings.getSendBundleDeadline(); - requestTimeout = builder.requestTimeout; + requestTimeout = settings.getRequestTimeout(); messagesBundle = new LinkedList<>(); messagesBundleLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); int numCores = Math.max(1, Runtime.getRuntime().availableProcessors()); executor = - builder.executor.isPresent() - ? builder.executor.get() + settings.getExecutor().isPresent() + ? settings.getExecutor().get() : Executors.newScheduledThreadPool( numCores * DEFAULT_MIN_THREAD_POOL_SIZE, new ThreadFactoryBuilder() @@ -123,9 +122,9 @@ final class PublisherImpl implements Publisher { channelIndex = new AtomicLong(0); for (int i = 0; i < numCores; i++) { channels[i] = - builder.channelBuilder.isPresent() - ? builder.channelBuilder.get().build() - : NettyChannelBuilder.forAddress(PUBSUB_API_ADDRESS, 443) + settings.getChannelBuilder().isPresent() + ? settings.getChannelBuilder().get().build() + : NettyChannelBuilder.forAddress(Publisher.Settings.PUBSUB_API_ADDRESS, 443) .negotiationType(NegotiationType.TLS) .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) .executor(executor) @@ -133,10 +132,10 @@ final class PublisherImpl implements Publisher { } credentials = MoreCallCredentials.from( - builder.userCredentials.isPresent() - ? builder.userCredentials.get() + settings.getUserCredentials().isPresent() + ? settings.getUserCredentials().get() : GoogleCredentials.getApplicationDefault() - .createScoped(Collections.singletonList(PUBSUB_API_SCOPE))); + .createScoped(Collections.singletonList(Publisher.Settings.PUBSUB_API_SCOPE))); shutdown = new AtomicBoolean(false); messagesWaiter = new MessagesWaiter(); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java index bd906b33ca74..aadd90d4a2e0 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java @@ -25,7 +25,6 @@ import com.google.api.gax.bundling.FlowController; import com.google.api.gax.grpc.BundlingSettings; -import com.google.cloud.pubsub.Publisher.Builder; import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; @@ -100,15 +99,17 @@ public static void tearDownClass() throws Exception { @Test public void testPublishByDuration() throws Exception { Publisher publisher = - getTestPublisherBuilder() - // To demonstrate that reaching duration will trigger publish - .setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setDelayThreshold(Duration.standardSeconds(5)) - .setElementCountThreshold(10) - .build()) - .build(); + new PublisherImpl( + TEST_TOPIC, + getTestSettingsBuilder() + // To demonstrate that reaching duration will trigger publish + .setBundlingSettings( + Publisher.Settings.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setDelayThreshold(Duration.standardSeconds(5)) + .setElementCountThreshold(10) + .build()) + .build()); testPublisherServiceImpl.addPublishResponse( PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")); @@ -132,14 +133,16 @@ public void testPublishByDuration() throws Exception { @Test public void testPublishByNumBundledMessages() throws Exception { Publisher publisher = - getTestPublisherBuilder() - .setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setElementCountThreshold(2) - .setDelayThreshold(Duration.standardSeconds(100)) - .build()) - .build(); + new PublisherImpl( + TEST_TOPIC, + getTestSettingsBuilder() + .setBundlingSettings( + Publisher.Settings.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setElementCountThreshold(2) + .setDelayThreshold(Duration.standardSeconds(100)) + .build()) + .build()); testPublisherServiceImpl .addPublishResponse(PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")) @@ -171,14 +174,16 @@ public void testPublishByNumBundledMessages() throws Exception { @Test public void testSinglePublishByNumBytes() throws Exception { Publisher publisher = - getTestPublisherBuilder() - .setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setElementCountThreshold(2) - .setDelayThreshold(Duration.standardSeconds(100)) - .build()) - .build(); + new PublisherImpl( + TEST_TOPIC, + getTestSettingsBuilder() + .setBundlingSettings( + Publisher.Settings.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setElementCountThreshold(2) + .setDelayThreshold(Duration.standardSeconds(100)) + .build()) + .build()); testPublisherServiceImpl .addPublishResponse(PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")) @@ -205,15 +210,17 @@ public void testSinglePublishByNumBytes() throws Exception { @Test public void testPublishMixedSizeAndDuration() throws Exception { Publisher publisher = - getTestPublisherBuilder() - // To demonstrate that reaching duration will trigger publish - .setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setElementCountThreshold(2) - .setDelayThreshold(Duration.standardSeconds(5)) - .build()) - .build(); + new PublisherImpl( + TEST_TOPIC, + getTestSettingsBuilder() + // To demonstrate that reaching duration will trigger publish + .setBundlingSettings( + Publisher.Settings.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setElementCountThreshold(2) + .setDelayThreshold(Duration.standardSeconds(5)) + .build()) + .build()); testPublisherServiceImpl.addPublishResponse( PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")); @@ -253,15 +260,17 @@ private ListenableFuture sendTestMessage(Publisher publisher, String dat @Test public void testPublishFailureRetries() throws Exception { Publisher publisher = - getTestPublisherBuilder() - .setExecutor(Executors.newSingleThreadScheduledExecutor()) - .setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setElementCountThreshold(1) - .setDelayThreshold(Duration.standardSeconds(5)) - .build()) - .build(); // To demonstrate that reaching duration will trigger publish + new PublisherImpl( + TEST_TOPIC, + getTestSettingsBuilder() + .setExecutor(Executors.newSingleThreadScheduledExecutor()) + .setBundlingSettings( + Publisher.Settings.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setElementCountThreshold(1) + .setDelayThreshold(Duration.standardSeconds(5)) + .build()) + .build()); // To demonstrate that reaching duration will trigger publish ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); @@ -277,16 +286,18 @@ public void testPublishFailureRetries() throws Exception { @Test(expected = Throwable.class) public void testPublishFailureRetries_exceededsRetryDuration() throws Exception { Publisher publisher = - getTestPublisherBuilder() - .setExecutor(Executors.newSingleThreadScheduledExecutor()) - .setSendBundleDeadline(Duration.standardSeconds(10)) - .setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setElementCountThreshold(1) - .setDelayThreshold(Duration.standardSeconds(5)) - .build()) - .build(); // To demonstrate that reaching duration will trigger publish + new PublisherImpl( + TEST_TOPIC, + getTestSettingsBuilder() + .setExecutor(Executors.newSingleThreadScheduledExecutor()) + .setSendBundleDeadline(Duration.standardSeconds(10)) + .setBundlingSettings( + Publisher.Settings.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setElementCountThreshold(1) + .setDelayThreshold(Duration.standardSeconds(5)) + .build()) + .build()); // To demonstrate that reaching duration will trigger publish ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); @@ -306,16 +317,18 @@ public void testPublishFailureRetries_exceededsRetryDuration() throws Exception @Test(expected = ExecutionException.class) public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exception { Publisher publisher = - getTestPublisherBuilder() - .setExecutor(Executors.newSingleThreadScheduledExecutor()) - .setSendBundleDeadline(Duration.standardSeconds(10)) - .setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setElementCountThreshold(1) - .setDelayThreshold(Duration.standardSeconds(5)) - .build()) - .build(); // To demonstrate that reaching duration will trigger publish + new PublisherImpl( + TEST_TOPIC, + getTestSettingsBuilder() + .setExecutor(Executors.newSingleThreadScheduledExecutor()) + .setSendBundleDeadline(Duration.standardSeconds(10)) + .setBundlingSettings( + Publisher.Settings.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setElementCountThreshold(1) + .setDelayThreshold(Duration.standardSeconds(5)) + .build()) + .build()); // To demonstrate that reaching duration will trigger publish ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); @@ -334,9 +347,9 @@ public void testPublisherGetters() throws Exception { FakeCredentials credentials = new FakeCredentials(); ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC); + Publisher.Settings.Builder builder = Publisher.Settings.newBuilder(); builder.setChannelBuilder(testChannelBuilder); - builder.setCredentials(credentials); + builder.setUserCredentials(credentials); builder.setExecutor(executor); builder.setFailOnFlowControlLimits(true); builder.setBundlingSettings( @@ -352,7 +365,7 @@ public void testPublisherGetters() throws Exception { .build()); builder.setRequestTimeout(new Duration(15)); builder.setSendBundleDeadline(new Duration(16000)); - Publisher publisher = builder.build(); + Publisher publisher = new PublisherImpl(TEST_TOPIC, builder.build()); assertEquals(TEST_TOPIC, publisher.getTopic()); assertEquals(10, publisher.getMaxBundleBytes()); @@ -365,192 +378,56 @@ public void testPublisherGetters() throws Exception { @Test public void testBuilderParametersAndDefaults() { - Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC); - assertEquals(TEST_TOPIC, builder.topic); - assertEquals(Optional.absent(), builder.channelBuilder); - assertEquals(Optional.absent(), builder.executor); - assertFalse(builder.failOnFlowControlLimits); + Publisher.Settings settings = Publisher.Settings.newBuilder().autoBuild(); + assertEquals(Optional.absent(), settings.getChannelBuilder()); + assertEquals(Optional.absent(), settings.getExecutor()); + assertFalse(settings.getFailOnFlowControlLimits()); assertEquals( - Publisher.DEFAULT_MAX_BUNDLE_BYTES, - builder.bundlingSettings.getRequestByteThreshold().longValue()); + Publisher.Settings.DEFAULT_MAX_BUNDLE_BYTES, + settings.getBundlingSettings().getRequestByteThreshold().longValue()); assertEquals( - Publisher.DEFAULT_MAX_BUNDLE_DURATION, builder.bundlingSettings.getDelayThreshold()); + Publisher.Settings.DEFAULT_MAX_BUNDLE_DURATION, + settings.getBundlingSettings().getDelayThreshold()); assertEquals( - Publisher.DEFAULT_MAX_BUNDLE_MESSAGES, - builder.bundlingSettings.getElementCountThreshold().longValue()); - assertEquals(FlowController.Settings.DEFAULT, builder.flowControlSettings); - assertEquals(Publisher.DEFAULT_REQUEST_TIMEOUT, builder.requestTimeout); - assertEquals(Publisher.MIN_SEND_BUNDLE_DURATION, builder.sendBundleDeadline); - assertEquals(Optional.absent(), builder.userCredentials); + Publisher.Settings.DEFAULT_MAX_BUNDLE_MESSAGES, + settings.getBundlingSettings().getElementCountThreshold().longValue()); + assertEquals(FlowController.Settings.DEFAULT, settings.getFlowControlSettings()); + assertEquals(Publisher.Settings.DEFAULT_REQUEST_TIMEOUT, settings.getRequestTimeout()); + assertEquals(Publisher.Settings.MIN_SEND_BUNDLE_DURATION, settings.getSendBundleDeadline()); + assertEquals(Optional.absent(), settings.getUserCredentials()); } @Test public void testBuilderInvalidArguments() { - Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC); + Publisher.Settings.newBuilder().build(); + Publisher.Settings.newBuilder() + .setRequestTimeout(Publisher.Settings.MIN_REQUEST_TIMEOUT) + .build(); try { - builder.setChannelBuilder(null); - fail("Should have thrown an IllegalArgumentException"); - } catch (NullPointerException expected) { - // Expected - } - - try { - builder.setCredentials(null); - fail("Should have thrown an IllegalArgumentException"); - } catch (NullPointerException expected) { - // Expected - } - - try { - builder.setExecutor(null); - fail("Should have thrown an IllegalArgumentException"); - } catch (NullPointerException expected) { - // Expected - } - try { - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setRequestByteThreshold((Long) null) - .build()); - fail("Should have thrown an NullPointerException"); - } catch (NullPointerException expected) { - // Expected - } - try { - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold(0).build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - try { - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold(-1).build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setDelayThreshold(new Duration(1)).build()); - try { - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setDelayThreshold(null).build()); - fail("Should have thrown an NullPointerException"); - } catch (NullPointerException expected) { - // Expected - } - try { - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setDelayThreshold(new Duration(-1)) - .build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(1).build()); - try { - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS - .toBuilder() - .setElementCountThreshold((Long) null) - .build()); - fail("Should have thrown an NullPointerException"); - } catch (NullPointerException expected) { - // Expected - } - try { - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(0).build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - try { - builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(-1).build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - - builder.setFlowControlSettings( - FlowController.Settings.DEFAULT - .toBuilder() - .setMaxOutstandingRequestBytes(Optional.of(1)) - .build()); - try { - builder.setFlowControlSettings( - FlowController.Settings.DEFAULT - .toBuilder() - .setMaxOutstandingRequestBytes(Optional.of(0)) - .build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - try { - builder.setFlowControlSettings( - FlowController.Settings.DEFAULT - .toBuilder() - .setMaxOutstandingRequestBytes(Optional.of(-1)) - .build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - - builder.setFlowControlSettings( - FlowController.Settings.DEFAULT - .toBuilder() - .setMaxOutstandingElementCount(Optional.of(1)) - .build()); - try { - builder.setFlowControlSettings( - FlowController.Settings.DEFAULT - .toBuilder() - .setMaxOutstandingElementCount(Optional.of(0)) - .build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - try { - builder.setFlowControlSettings( - FlowController.Settings.DEFAULT - .toBuilder() - .setMaxOutstandingElementCount(Optional.of(-1)) - .build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - - builder.setRequestTimeout(Publisher.MIN_REQUEST_TIMEOUT); - try { - builder.setRequestTimeout(Publisher.MIN_REQUEST_TIMEOUT.minus(1)); + Publisher.Settings.newBuilder() + .setRequestTimeout(Publisher.Settings.MIN_REQUEST_TIMEOUT.minus(1)) + .build(); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected } - builder.setSendBundleDeadline(Publisher.MIN_SEND_BUNDLE_DURATION); + Publisher.Settings.newBuilder() + .setSendBundleDeadline(Publisher.Settings.MIN_SEND_BUNDLE_DURATION) + .build(); try { - builder.setSendBundleDeadline(Publisher.MIN_SEND_BUNDLE_DURATION.minus(1)); + Publisher.Settings.newBuilder() + .setSendBundleDeadline(Publisher.Settings.MIN_SEND_BUNDLE_DURATION.minus(1)) + .build(); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected } } - private Builder getTestPublisherBuilder() { - return Publisher.Builder.newBuilder(TEST_TOPIC) - .setCredentials(testCredentials) + private Publisher.Settings.Builder getTestSettingsBuilder() { + return Publisher.Settings.newBuilder() + .setUserCredentials(testCredentials) .setExecutor(fakeExecutor) .setChannelBuilder(testChannelBuilder); } From ac39e171e525e48cafb86526741080138a759e58 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Tue, 10 Jan 2017 17:30:17 +1100 Subject: [PATCH 2/2] split options --- .../com/google/cloud/pubsub/PubSubImpl.java | 2 +- .../google/cloud/pubsub/PubSubOptions.java | 7 +- .../com/google/cloud/pubsub/Publisher.java | 34 ------- .../google/cloud/pubsub/PublisherImpl.java | 65 +++++------- .../cloud/pubsub/PublisherImplTest.java | 99 ++++++++++++++----- 5 files changed, 108 insertions(+), 99 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index 45d309012d8d..a3ac4924f299 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -289,7 +289,7 @@ public Future> listTopicsAsync(ListOption... options) { public Publisher newPublisher(String topic, Publisher.Settings settings) throws PubSubException { String topicName = PublisherClient.formatTopicName(getOptions().getProjectId(), topic); try { - return new PublisherImpl(topicName, settings); + return new PublisherImpl(topicName, getOptions(), settings); } catch (IOException e) { throw new PubSubException(e, false); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java index ac8262d1af97..1b58b86294bb 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java @@ -16,13 +16,13 @@ package com.google.cloud.pubsub; +import com.google.api.gax.grpc.ChannelProvider; import com.google.cloud.GrpcServiceOptions; import com.google.cloud.pubsub.spi.DefaultPubSubRpc; import com.google.cloud.pubsub.spi.PubSubRpc; import com.google.cloud.pubsub.spi.PubSubRpcFactory; import com.google.cloud.pubsub.spi.v1.PublisherSettings; import com.google.common.collect.ImmutableSet; - import java.io.IOException; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; @@ -103,6 +103,11 @@ protected ExecutorFactory getExecutorFactory() { return super.getExecutorFactory(); } + @Override + protected ChannelProvider getChannelProvider() { + return super.getChannelProvider(); + } + @Override protected PubSubFactory getDefaultServiceFactory() { return DefaultPubSubFactory.INSTANCE; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java index b6385d73b127..28ebca802948 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java @@ -18,15 +18,12 @@ import com.google.api.gax.bundling.FlowController; import com.google.api.gax.grpc.BundlingSettings; -import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.auto.value.AutoValue; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; import com.google.pubsub.v1.PubsubMessage; -import io.grpc.ManagedChannelBuilder; -import java.util.concurrent.ScheduledExecutorService; import org.joda.time.Duration; /** @@ -176,23 +173,12 @@ public abstract class Settings { abstract Duration getRequestTimeout(); - abstract Optional getUserCredentials(); - - abstract Optional>> - getChannelBuilder(); - - abstract Optional getExecutor(); - public static Builder newBuilder() { return new AutoValue_Publisher_Settings.Builder() .setFlowControlSettings(FlowController.Settings.DEFAULT) .setFailOnFlowControlLimits(false) .setSendBundleDeadline(MIN_SEND_BUNDLE_DURATION) .setRequestTimeout(DEFAULT_REQUEST_TIMEOUT) - .setUserCredentials(Optional.absent()) - .setChannelBuilder( - Optional.>>absent()) - .setExecutor(Optional.absent()) .setBundlingSettings(DEFAULT_BUNDLING_SETTINGS); } @@ -208,26 +194,6 @@ abstract static class Builder { abstract Builder setRequestTimeout(Duration value); - abstract Builder setUserCredentials(Optional value); - - Builder setUserCredentials(Credentials value) { - return setUserCredentials(Optional.of(value)); - } - - abstract Builder setChannelBuilder( - Optional>> value); - - Builder setChannelBuilder(ManagedChannelBuilder> value) { - return setChannelBuilder( - Optional.>>of(value)); - } - - abstract Builder setExecutor(Optional value); - - Builder setExecutor(ScheduledExecutorService value) { - return setExecutor(Optional.of(value)); - } - abstract Settings autoBuild(); public Settings build() { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java index 78eb097d4ba5..658fb6028861 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java @@ -17,7 +17,6 @@ package com.google.cloud.pubsub; import com.google.api.gax.bundling.FlowController; -import com.google.auth.oauth2.GoogleCredentials; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Ints; @@ -25,24 +24,18 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc; import com.google.pubsub.v1.PubsubMessage; import io.grpc.CallCredentials; -import io.grpc.Channel; +import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.auth.MoreCallCredentials; -import io.grpc.netty.GrpcSslContexts; -import io.grpc.netty.NegotiationType; -import io.grpc.netty.NettyChannelBuilder; import java.io.IOException; -import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -63,6 +56,7 @@ final class PublisherImpl implements Publisher { private static final Logger logger = LoggerFactory.getLogger(PublisherImpl.class); private final String topic; + private final PubSubOptions psOptions; private final long maxBundleMessages; private final long maxBundleBytes; @@ -79,7 +73,7 @@ final class PublisherImpl implements Publisher { private final AtomicBoolean activeAlarm; private final FlowController flowController; - private final Channel[] channels; + private final ManagedChannel[] channels; private final AtomicLong channelIndex; private final CallCredentials credentials; private final Duration requestTimeout; @@ -90,8 +84,9 @@ final class PublisherImpl implements Publisher { private final Duration sendBundleDeadline; private ScheduledFuture currentAlarmFuture; - PublisherImpl(String topic, Settings settings) throws IOException { + PublisherImpl(String topic, PubSubOptions psOptions, Settings settings) throws IOException { this.topic = topic; + this.psOptions = psOptions; maxBundleMessages = settings.getBundlingSettings().getElementCountThreshold(); maxBundleBytes = settings.getBundlingSettings().getRequestByteThreshold(); maxBundleDuration = settings.getBundlingSettings().getDelayThreshold(); @@ -108,36 +103,23 @@ final class PublisherImpl implements Publisher { messagesBundle = new LinkedList<>(); messagesBundleLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); - int numCores = Math.max(1, Runtime.getRuntime().availableProcessors()); - executor = - settings.getExecutor().isPresent() - ? settings.getExecutor().get() - : Executors.newScheduledThreadPool( - numCores * DEFAULT_MIN_THREAD_POOL_SIZE, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("cloud-pubsub-publisher-thread-%d") - .build()); - channels = new Channel[numCores]; - channelIndex = new AtomicLong(0); - for (int i = 0; i < numCores; i++) { - channels[i] = - settings.getChannelBuilder().isPresent() - ? settings.getChannelBuilder().get().build() - : NettyChannelBuilder.forAddress(Publisher.Settings.PUBSUB_API_ADDRESS, 443) - .negotiationType(NegotiationType.TLS) - .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) - .executor(executor) - .build(); - } - credentials = - MoreCallCredentials.from( - settings.getUserCredentials().isPresent() - ? settings.getUserCredentials().get() - : GoogleCredentials.getApplicationDefault() - .createScoped(Collections.singletonList(Publisher.Settings.PUBSUB_API_SCOPE))); shutdown = new AtomicBoolean(false); messagesWaiter = new MessagesWaiter(); + + int numCores = Runtime.getRuntime().availableProcessors(); + executor = psOptions.getExecutorFactory().get(); + + channels = new ManagedChannel[numCores]; + for (int i = 0; i < numCores; i++) { + if (psOptions.getChannelProvider().needsExecutor()) { + channels[i] = psOptions.getChannelProvider().getChannel(executor); + } else { + channels[i] = psOptions.getChannelProvider().getChannel(); + } + } + channelIndex = new AtomicLong(0); + + credentials = MoreCallCredentials.from(psOptions.getCredentials()); } @Override @@ -420,6 +402,13 @@ public void shutdown() { } publishAllOustanding(); messagesWaiter.waitNoMessages(); + + if (psOptions.getChannelProvider().shouldAutoClose()) { + for (ManagedChannel channel : channels) { + channel.shutdown(); + } + } + psOptions.getExecutorFactory().release(executor); } private static long computeNextBackoffDelayMs(OutstandingBundle outstandingBundle) { diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java index aadd90d4a2e0..97b649fee54f 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java @@ -25,12 +25,15 @@ import com.google.api.gax.bundling.FlowController; import com.google.api.gax.grpc.BundlingSettings; +import com.google.api.gax.grpc.ChannelProvider; +import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.inprocess.InProcessChannelBuilder; @@ -38,6 +41,7 @@ import io.grpc.internal.ServerImpl; import io.grpc.stub.StreamObserver; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import org.joda.time.Duration; @@ -56,6 +60,7 @@ @RunWith(JUnit4.class) public class PublisherImplTest { + private static final String TEST_PROJECT_NAME = "test-project"; private static final String TEST_TOPIC = "projects/test-project/topics/test-topic"; private static InProcessChannelBuilder testChannelBuilder; @@ -101,7 +106,8 @@ public void testPublishByDuration() throws Exception { Publisher publisher = new PublisherImpl( TEST_TOPIC, - getTestSettingsBuilder() + getTestOptions(fakeExecutor), + Publisher.Settings.newBuilder() // To demonstrate that reaching duration will trigger publish .setBundlingSettings( Publisher.Settings.DEFAULT_BUNDLING_SETTINGS @@ -135,7 +141,8 @@ public void testPublishByNumBundledMessages() throws Exception { Publisher publisher = new PublisherImpl( TEST_TOPIC, - getTestSettingsBuilder() + getTestOptions(fakeExecutor), + Publisher.Settings.newBuilder() .setBundlingSettings( Publisher.Settings.DEFAULT_BUNDLING_SETTINGS .toBuilder() @@ -176,7 +183,8 @@ public void testSinglePublishByNumBytes() throws Exception { Publisher publisher = new PublisherImpl( TEST_TOPIC, - getTestSettingsBuilder() + getTestOptions(fakeExecutor), + Publisher.Settings.newBuilder() .setBundlingSettings( Publisher.Settings.DEFAULT_BUNDLING_SETTINGS .toBuilder() @@ -212,7 +220,8 @@ public void testPublishMixedSizeAndDuration() throws Exception { Publisher publisher = new PublisherImpl( TEST_TOPIC, - getTestSettingsBuilder() + getTestOptions(fakeExecutor), + Publisher.Settings.newBuilder() // To demonstrate that reaching duration will trigger publish .setBundlingSettings( Publisher.Settings.DEFAULT_BUNDLING_SETTINGS @@ -262,8 +271,8 @@ public void testPublishFailureRetries() throws Exception { Publisher publisher = new PublisherImpl( TEST_TOPIC, - getTestSettingsBuilder() - .setExecutor(Executors.newSingleThreadScheduledExecutor()) + getTestOptions(Executors.newSingleThreadScheduledExecutor()), + Publisher.Settings.newBuilder() .setBundlingSettings( Publisher.Settings.DEFAULT_BUNDLING_SETTINGS .toBuilder() @@ -288,8 +297,8 @@ public void testPublishFailureRetries_exceededsRetryDuration() throws Exception Publisher publisher = new PublisherImpl( TEST_TOPIC, - getTestSettingsBuilder() - .setExecutor(Executors.newSingleThreadScheduledExecutor()) + getTestOptions(Executors.newSingleThreadScheduledExecutor()), + Publisher.Settings.newBuilder() .setSendBundleDeadline(Duration.standardSeconds(10)) .setBundlingSettings( Publisher.Settings.DEFAULT_BUNDLING_SETTINGS @@ -319,8 +328,8 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce Publisher publisher = new PublisherImpl( TEST_TOPIC, - getTestSettingsBuilder() - .setExecutor(Executors.newSingleThreadScheduledExecutor()) + getTestOptions(Executors.newSingleThreadScheduledExecutor()), + Publisher.Settings.newBuilder() .setSendBundleDeadline(Duration.standardSeconds(10)) .setBundlingSettings( Publisher.Settings.DEFAULT_BUNDLING_SETTINGS @@ -344,13 +353,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce @Test public void testPublisherGetters() throws Exception { - FakeCredentials credentials = new FakeCredentials(); - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - Publisher.Settings.Builder builder = Publisher.Settings.newBuilder(); - builder.setChannelBuilder(testChannelBuilder); - builder.setUserCredentials(credentials); - builder.setExecutor(executor); builder.setFailOnFlowControlLimits(true); builder.setBundlingSettings( BundlingSettings.newBuilder() @@ -365,7 +368,8 @@ public void testPublisherGetters() throws Exception { .build()); builder.setRequestTimeout(new Duration(15)); builder.setSendBundleDeadline(new Duration(16000)); - Publisher publisher = new PublisherImpl(TEST_TOPIC, builder.build()); + Publisher publisher = + new PublisherImpl(TEST_TOPIC, getTestOptions(fakeExecutor), builder.build()); assertEquals(TEST_TOPIC, publisher.getTopic()); assertEquals(10, publisher.getMaxBundleBytes()); @@ -379,8 +383,6 @@ public void testPublisherGetters() throws Exception { @Test public void testBuilderParametersAndDefaults() { Publisher.Settings settings = Publisher.Settings.newBuilder().autoBuild(); - assertEquals(Optional.absent(), settings.getChannelBuilder()); - assertEquals(Optional.absent(), settings.getExecutor()); assertFalse(settings.getFailOnFlowControlLimits()); assertEquals( Publisher.Settings.DEFAULT_MAX_BUNDLE_BYTES, @@ -394,7 +396,6 @@ public void testBuilderParametersAndDefaults() { assertEquals(FlowController.Settings.DEFAULT, settings.getFlowControlSettings()); assertEquals(Publisher.Settings.DEFAULT_REQUEST_TIMEOUT, settings.getRequestTimeout()); assertEquals(Publisher.Settings.MIN_SEND_BUNDLE_DURATION, settings.getSendBundleDeadline()); - assertEquals(Optional.absent(), settings.getUserCredentials()); } @Test @@ -425,10 +426,58 @@ public void testBuilderInvalidArguments() { } } - private Publisher.Settings.Builder getTestSettingsBuilder() { - return Publisher.Settings.newBuilder() - .setUserCredentials(testCredentials) - .setExecutor(fakeExecutor) - .setChannelBuilder(testChannelBuilder); + private PubSubOptions getTestOptions(final ScheduledExecutorService executor) { + return new TestPubSubOptions( + PubSubOptions.newBuilder() + .setProjectId(TEST_PROJECT_NAME) + .setCredentials(testCredentials) + .setExecutorFactory( + new ExecutorFactory() { + @Override + public ScheduledExecutorService get() { + return executor; + } + + @Override + public void release(ScheduledExecutorService executor) { + // do nothing + } + }), + new ChannelProvider() { + @Override + public boolean shouldAutoClose() { + return true; + } + + @Override + public boolean needsExecutor() { + return false; + } + + @Override + public ManagedChannel getChannel() { + return testChannelBuilder.build(); + } + + @Override + public ManagedChannel getChannel(Executor executor) { + throw new IllegalStateException( + "getChannel(Executor) called when needsExecutor() is false."); + } + }); + } + + private static class TestPubSubOptions extends PubSubOptions { + private final ChannelProvider channelProvider; + + TestPubSubOptions(PubSubOptions.Builder b, ChannelProvider c) { + super(b); + this.channelProvider = c; + } + + @Override + protected ChannelProvider getChannelProvider() { + return channelProvider; + } } }