8000 return Name objects instead of plain String by pongad · Pull Request #1562 · googleapis/google-cloud-java · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.SubscriptionName;
Expand Down Expand Up @@ -52,7 +51,7 @@ public ListenableFuture<MessageReceiver.AckReply> receiveMessage(PubsubMessage m
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.Builder.newBuilder(subscription, receiver).build();
subscriber = Subscriber.newBuilder(subscription, receiver).build();
subscriber.addListener(

This comment was marked as spam.

This comment was marked as spam.

new Subscriber.SubscriberListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static void main(String... args) throws Exception {

Publisher publisher = null;
try {
publisher = Publisher.Builder.newBuilder(topic).build();
publisher = Publisher.newBuilder(topic).build();
List<String> messages = Arrays.asList("first message", "second message");
List<ListenableFuture<String>> messageIds = new ArrayList<>();
for (String message : messages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
*
* <pre><code>
* Publisher publisher =
* Publisher.Builder.newBuilder(MY_TOPIC)
* Publisher.newBuilder(MY_TOPIC)
* .setMaxBundleDuration(new Duration(10 * 1000))
* .build();
* List&lt;ListenableFuture&lt;String&gt;&gt; results = new ArrayList&lt;&gt;();
Expand Down Expand Up @@ -122,7 +122,8 @@ public static long getApiMaxRequestBytes() {

private static final Logger logger = LoggerFactory.getLogger(Publisher.class);

private final String topic;
private final TopicName topicName;
private final String cachedTopicNameString;

private final BundlingSettings bundlingSettings;
private final RetrySettings retrySettings;
Expand All @@ -149,7 +150,8 @@ public static long getApiMaxRequestBytes() {
private ScheduledFuture<?> currentAlarmFuture;

private Publisher(Builder builder) throws IOException {
topic = builder.topic;
topicName = builder.topicName;
cachedTopicNameString = topicName.toString();

this.bundlingSettings = builder.bundlingSettings;
this.retrySettings = builder.retrySettings;
Expand Down Expand Up @@ -198,8 +200,8 @@ public void close() throws IOException {
}

/** Topic which the publisher publishes to. */
public String getTopic() {
return topic;
public TopicName getTopicName() {
return topicName;
}

/**
Expand Down Expand Up @@ -333,7 +335,7 @@ private void publishAllOutstanding() {

private void publishOutstandingBundle(final OutstandingBundle outstandingBundle) {
PublishRequest.Builder publishRequest = PublishRequest.newBuilder();
publishRequest.setTopic(topic);
publishRequest.setTopic(cachedTopicNameString);
for (OutstandingPublish outstandingPublish : outstandingBundle.outstandingPublishes) {
publishRequest.addMessages(outstandingPublish.message);
}
Expand Down Expand Up @@ -528,6 +530,11 @@ interface LongRandom {
long nextLong(long least, long bound);
}

/** Constructs a new {@link Builder} using the given topic. */
public static Builder newBuilder(TopicName topicName) {
return new Builder(topicName);
}

/** A builder of {@link Publisher}s. */
public static final class Builder {
static final Duration MIN_TOTAL_TIMEOUT = new Duration(10 * 1000); // 10 seconds
Expand Down Expand Up @@ -569,7 +576,7 @@ public long nextLong(long least, long bound) {
.setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
.build();

String topic;
TopicName topicName;

// Bundling options
BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS;
Expand All @@ -588,13 +595,8 @@ public long nextLong(long least, long bound) {

ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;

/** Constructs a new {@link Builder} using the given topic. */
public static Builder newBuilder(TopicName topic) {
return new Builder(topic.toString());
}

Builder(String topic) {
this.topic = Preconditions.checkNotNull(topic);
private Builder(TopicName topic) {
this.topicName = Preconditions.checkNotNull(topic);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import com.google.common.util.concurrent.Service;
import com.google.pubsub.v1.SubscriptionName;
import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.GrpcSslContexts;
Expand All @@ -43,9 +41,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -88,7 +88,7 @@
* }
*
* Subscriber subscriber =
* Subscriber.Builder.newBuilder(MY_SUBSCRIPTION, receiver)
* Subscriber.newBuilder(MY_SUBSCRIPTION, receiver)
* .setMaxBundleAcks(100)
* .build();
*
Expand Down Expand Up @@ -123,19 +123,33 @@ private Subscriber(Builder builder) throws IOException {
impl = new SubscriberImpl(builder);
}

/**
* Constructs a new {@link Builder}.
*
* <p>Once {@link #build()} is called a gRPC stub will be created for use of the {@link
* Subscriber}.
*
* @param subscription Cloud Pub/Sub subscription to bind the subscriber to
* @param receiver an implementation of {@link MessageReceiver} used to process the received
* messages
*/
public static Builder newBuilder(SubscriptionName subscription, MessageReceiver receiver) {
return new Builder(subscription, receiver);
}

/** Subscription which the subscriber is subscribed to. */
public String getSubscription() {
return impl.getSubscription();
public SubscriptionName getSubscriptionName() {
return impl.subscriptionName;
}

/** Acknowledgement expiration padding. See {@link Builder.setAckExpirationPadding}. */
public Duration getAckExpirationPadding() {
return impl.getAckExpirationPadding();
return impl.ackExpirationPadding;
}

/** The flow control settings the Subscriber is configured with. */
public FlowController.Settings getFlowControlSettings() {
return impl.getFlowControlSettings();
return impl.flowControlSettings;
}

public void addListener(final SubscriberListener listener, Executor executor) {
Expand Down Expand Up @@ -249,7 +263,8 @@ public void terminated(State from) {}
private static class SubscriberImpl extends AbstractService {
private static final Logger logger = LoggerFactory.getLogger(Subscriber.class);

private final String subscription;
private final SubscriptionName subscriptionName;
private final String cachedSubscriptionNameString;
private final FlowController.Settings flowControlSettings;
private final Duration ackExpirationPadding;
private final ScheduledExecutorService executor;
Expand All @@ -270,7 +285,8 @@ private static class SubscriberImpl extends AbstractService {
private SubscriberImpl(Builder builder) throws IOException {
receiver = builder.receiver;
flowControlSettings = builder.flowControlSettings;
subscription = builder.subscription;
subscriptionName = builder.subscriptionName;
cachedSubscriptionNameString = subscriptionName.toString();
ackExpirationPadding = builder.ackExpirationPadding;
streamAckDeadlineSeconds =
Math.max(
Expand Down Expand Up @@ -340,7 +356,7 @@ private void startStreamingConnections() {
for (int i = 0; i < numChannels; i++) {
streamingSubscriberConnections.add(
new StreamingSubscriberConnection(
subscription,
cachedSubscriptionNameString,
credentials,
receiver,
ackExpirationPadding,
Expand Down Expand Up @@ -412,7 +428,7 @@ private void startPollingConnections() {
for (int i = 0; i < numChannels; i++) {
pollingSubscriberConnections.add(
new PollingSubscriberConnection(
subscription,
cachedSubscriptionNameString,
credentials,
receiver,
ackExpirationPadding,
Expand Down Expand Up @@ -496,21 +512,6 @@ public void run() {
throw new IllegalStateException(e);
}
}

/** Subscription which the subscriber is subscribed to. */
public String getSubscription() {
return subscription;
}

/** Acknowledgement expiration padding. See {@link Builder.setAckExpirationPadding}. */
public Duration getAckExpirationPadding() {
return ackExpirationPadding;
}

/** The flow control settings the Subscriber is configured with. */
public FlowController.Settings getFlowControlSettings() {
return flowControlSettings;
}
}

/** Builder of {@link Subscriber Subscribers}. */
Expand All @@ -526,7 +527,7 @@ public static final class Builder {
* Runtime.getRuntime().availableProcessors())
.build();

String subscription;
SubscriptionName subscriptionName;
Optional<Credentials> credentials = Optional.absent();
MessageReceiver receiver;

Expand All @@ -539,22 +540,8 @@ public static final class Builder {
Optional.absent();
Optional<Clock> clock = Optional.absent();

/**
* Constructs a new {@link Builder}.
*
* <p>Once {@link #build()} is called a gRPC stub will be created for use of the {@link
* Subscriber}.
*
* @param subscription Cloud Pub/Sub subscription to bind the subscriber to
* @param receiver an implementation of {@link MessageReceiver} used to process the received
* messages
*/
public static Builder newBuilder(SubscriptionName subscription, MessageReceiver receiver) {
return new Builder(subscription.toString(), receiver);
}

Builder(String subscription, MessageReceiver receiver) {
this.subscription = subscription;
Builder(SubscriptionName subscriptionName, MessageReceiver receiver) {
this.subscriptionName = subscriptionName;
this.receiver = receiver;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
public void testPublisherGetters() throws Exception {
FakeCredentials credentials = new FakeCredentials();

Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC);
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
builder.setChannelBuilder(testChannelBuilder);
builder.setCredentials(credentials);
builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR);
Expand All @@ -370,7 +370,7 @@ public void testPublisherGetters() throws Exception {
.build());
Publisher publisher = builder.build();

assertEquals(TEST_TOPIC.toString(), publisher.getTopic());
assertEquals(TEST_TOPIC, publisher.getTopicName());
assertEquals(10, (long) publisher.getBundlingSettings().getRequestByteThreshold());
assertEquals(new Duration(11), publisher.getBundlingSettings().getDelayThreshold());
assertEquals(12, (long) publisher.getBundlingSettings().getElementCountThreshold());
Expand All @@ -384,8 +384,8 @@ public void testPublisherGetters() throws Exception {

@Test
public void testBuilderParametersAndDefaults() {
Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC);
assertEquals(TEST_TOPIC.toString(), builder.topic);
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
assertEquals(TEST_TOPIC, builder.topicName);
assertEquals(Optional.absent(), builder.channelBuilder);
assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider);
assertFalse(builder.failOnFlowControlLimits);
Expand All @@ -404,7 +404,7 @@ public void testBuilderParametersAndDefaults() {

@Test
public void testBuilderInvalidArguments() {
Publisher.Builder builder = Publisher.Builder.newBuilder(TEST_TOPIC);
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);

try {
builder.setChannelBuilder(null);
Expand Down Expand Up @@ -601,7 +601,7 @@ public void testBuilderInvalidArguments() {
}

private Builder getTestPublisherBuilder() {
return Publisher.Builder.newBuilder(TEST_TOPIC)
return Publisher.newBuilder(TEST_TOPIC)
.setCredentials(testCredentials)
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
.setChannelBuilder(testChannelBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service.State;
import com.google.common.util.concurrent.SettableFuture;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullResponse;
Expand Down Expand Up @@ -467,7 +466,7 @@ private void sendMessages(Iterable<String> ackIds) throws InterruptedException {
}

private Builder getTestSubscriberBuilder(MessageReceiver receiver) {
return Subscriber.Builder.newBuilder(TEST_SUBSCRIPTION, receiver)
return Subscriber.newBuilder(TEST_SUBSCRIPTION, receiver)
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
.setCredentials(testCredentials)
.setChannelBuilder(testChannelBuilder)
Expand Down
0