From 6dd436d2fab6f48c388b33084b675bbdd696a337 Mon Sep 17 00:00:00 2001 From: mayur-solace Date: Wed, 27 Sep 2023 10:24:48 -0400 Subject: [PATCH 1/3] bug reproduction --- .../SpringCloudGCPBug2158Publisher.java | 123 ++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 samples/snippets/src/main/java/pubsub/SpringCloudGCPBug2158Publisher.java diff --git a/samples/snippets/src/main/java/pubsub/SpringCloudGCPBug2158Publisher.java b/samples/snippets/src/main/java/pubsub/SpringCloudGCPBug2158Publisher.java new file mode 100644 index 000000000..2bdf1f3b0 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/SpringCloudGCPBug2158Publisher.java @@ -0,0 +1,123 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_publish_with_ordering_keys] + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.AlreadyExistsException; +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class SpringCloudGCPBug2158Publisher { + + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + final String projectId = "capable-stream-180018"; + final String topicId = "mpatel-test-topic1"; + + createTopic(projectId, topicId); + publishWithOrderingKeysExample(projectId, topicId); + } + + public static void createTopic(String projectId, String topicId) throws IOException { + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + TopicName topicName = TopicName.of(projectId, topicId); + Topic topic = topicAdminClient.createTopic(topicName); + System.out.println("Created topic: " + topic.getName()); + } catch (AlreadyExistsException ae) { + //ignore if topic already exists. + } + } + + public static void publishWithOrderingKeysExample(String projectId, String topicId) + throws IOException, InterruptedException { + TopicName topicName = TopicName.of(projectId, topicId); + // Create a publisher and set message ordering to true. + Publisher publisher = + Publisher.newBuilder(topicName) + // Sending messages to the same region ensures they are received in order + // even when multiple publishers are used. + .setEndpoint("us-east1-pubsub.googleapis.com:443") + .setEnableMessageOrdering(true) + .build(); + + try { + for (int i = 0; i < 1000; i++) { + Map messages = new LinkedHashMap(); + messages.put("message1", "key" + i); + messages.put("message2", "key" + i); + messages.put("message3", "key" + i); + + for (Map.Entry entry : messages.entrySet()) { + ByteString data = ByteString.copyFromUtf8(entry.getKey()); + PubsubMessage pubsubMessage = + PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build(); + ApiFuture future = publisher.publish(pubsubMessage); + + // Add an asynchronous callback to handle publish success / failure. + ApiFutures.addCallback( + future, + new ApiFutureCallback() { + + @Override + public void onFailure(Throwable throwable) { + if (throwable instanceof ApiException) { + ApiException apiException = ((ApiException) throwable); + // Details on the API exception. + System.out.println(apiException.getStatusCode().getCode()); + System.out.println(apiException.isRetryable()); + } + System.out.println("Error publishing message : " + pubsubMessage.getData()); + } + + @Override + public void onSuccess(String messageId) { + // Once published, returns server-assigned message ids (unique within the topic). + //System.out.println(pubsubMessage.getData() + " : " + messageId); + + System.out.printf("[%s] Id: %s, OrderingKey: %s, Data: %s%n", LocalDateTime.now(), + pubsubMessage.getMessageId(), pubsubMessage.getOrderingKey(), pubsubMessage.getData().toStringUtf8()); + } + }, + MoreExecutors.directExecutor()); + } + + //TODO: You may want to increase the delay + Thread.sleep(2000); //Delay by 2 second + } + } finally { + // When finished with the publisher, shutdown to free up resources. + publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); + } + } +} +// [END pubsub_publish_with_ordering_keys] From 71f91a83b441995fcbc95c0ada0c3ad373767ea7 Mon Sep 17 00:00:00 2001 From: mayur-solace Date: Wed, 27 Sep 2023 10:26:16 -0400 Subject: [PATCH 2/3] bug reproduction --- .../SpringCloudGCPBug2158Subscriber.java | 111 ++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 samples/snippets/src/main/java/pubsub/SpringCloudGCPBug2158Subscriber.java diff --git a/samples/snippets/src/main/java/pubsub/SpringCloudGCPBug2158Subscriber.java b/samples/snippets/src/main/java/pubsub/SpringCloudGCPBug2158Subscriber.java new file mode 100644 index 000000000..d917c42bf --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/SpringCloudGCPBug2158Subscriber.java @@ -0,0 +1,111 @@ +package pubsub; + +import com.google.api.gax.rpc.AlreadyExistsException; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +//Bug > https://github.com/GoogleCloudPlatform/spring-cloud-gcp/issues/2158 +public class SpringCloudGCPBug2158Subscriber { + + public static void main(String... args) throws Exception { + // TODO(developer): Set the GOOGLE_APPLICATION_CREDENTIALS to point to the service account key + //export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account-key.json + + // TODO(developer): Replace these variables before running the sample. + final String projectId = "your-project-id"; + final String topicId = "your-topic-id"; + + //TODO: toggle between true/false for testing behaviour of Subscription with ordering and without ordering + final boolean withOrderingEnabled = true; + + createTopic(projectId, topicId); + if (withOrderingEnabled) { + final String subscriptionIdWithOrdering = "mpatel-test-topic1-sub-ordered"; + createSubscription(projectId, topicId, subscriptionIdWithOrdering, true); + subscribeAsync(projectId, subscriptionIdWithOrdering); + } else { + String subscriptionIdNoOrdering = "mpatel-test-topic1-no-ordering"; + createSubscription(projectId, topicId, subscriptionIdNoOrdering, false); + subscribeAsync(projectId, subscriptionIdNoOrdering); + } + } + + public static void createTopic(String projectId, String topicId) throws IOException { + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + TopicName topicName = TopicName.of(projectId, topicId); + Topic topic = topicAdminClient.createTopic(topicName); + System.out.println("Created topic: " + topic.getName()); + } catch (AlreadyExistsException ae) { + //ignore if topic already exists. + } + } + + public static void createSubscription( + String projectId, String topicId, String subscriptionId, boolean enableMsgOrdering) + throws IOException { + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId); + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + Subscription subscription = + subscriptionAdminClient.createSubscription( + Subscription.newBuilder() + .setName(subscriptionName.toString()) + .setTopic(topicName.toString()) + // Set message ordering to true for ordered messages in the subscription. + .setEnableMessageOrdering(enableMsgOrdering) + .build()); + + System.out.println("Created a subscription: " + subscription.getAllFields()); + } catch (AlreadyExistsException ae) { + //ignore if topic already exists. + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + Subscription subscription = subscriptionAdminClient.getSubscription(SubscriptionName.of(projectId, subscriptionId)); + System.out.println("Existing Subscription: " + subscription.getAllFields()); + } + } + } + + public static void subscribeAsync(String projectId, String subscriptionId) { + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + // Instantiate an asynchronous message receiver. + MessageReceiver receiver = + (PubsubMessage message, AckReplyConsumer consumer) -> { + // Handle incoming message, then ack the received message. + System.out.printf("[%s] Id: %s, OrderingKey: %s, Data: %s%n", LocalDateTime.now(), + message.getMessageId(), message.getOrderingKey(), message.getData().toStringUtf8()); + consumer.ack(); + }; + + Subscriber subscriber = null; + try { + subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + // Start the subscriber. + subscriber.startAsync().awaitRunning(); + System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); + // Allow the subscriber to run for 30s unless an unrecoverable error occurs. + subscriber.awaitTerminated(5000, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { + // Shut down the subscriber after 30s. Stop receiving messages. + subscriber.stopAsync(); + } + } +} From e248f4411021c52e8821cada3aea089017bcc65d Mon Sep 17 00:00:00 2001 From: mayur-solace Date: Wed, 27 Sep 2023 10:37:42 -0400 Subject: [PATCH 3/3] Bug Reproduction --- .../main/java/pubsub/SpringCloudGCPBug2158Publisher.java | 9 +++++++-- .../java/pubsub/SpringCloudGCPBug2158Subscriber.java | 6 +++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/samples/snippets/src/main/java/pubsub/SpringCloudGCPBug2158Publisher.java b/samples/snippets/src/main/java/pubsub/SpringCloudGCPBug2158Publisher.java index 2bdf1f3b0..86f890fa2 100644 --- a/samples/snippets/src/main/java/pubsub/SpringCloudGCPBug2158Publisher.java +++ b/samples/snippets/src/main/java/pubsub/SpringCloudGCPBug2158Publisher.java @@ -39,9 +39,14 @@ public class SpringCloudGCPBug2158Publisher { public static void main(String... args) throws Exception { + // TODO(developer): Set the GOOGLE_APPLICATION_CREDENTIALS to point to the service account key + //Note: service account key is different from the subscriber's service account key + //You could also use ADC instead of service account key for publisher + //export GOOGLE_APPLICATION_CREDENTIALS=/path/to/publisher-service-account-key.json + // TODO(developer): Replace these variables before running the sample. - final String projectId = "capable-stream-180018"; - final String topicId = "mpatel-test-topic1"; + final String projectId = "your-project-id"; + final String topicId = "your-topic-id"; createTopic(projectId, topicId); publishWithOrderingKeysExample(projectId, topicId); diff --git a/samples/snippets/src/main/java/pubsub/SpringCloudGCPBug2158Subscriber.java b/samples/snippets/src/main/java/pubsub/SpringCloudGCPBug2158Subscriber.java index d917c42bf..147e38d3a 100644 --- a/samples/snippets/src/main/java/pubsub/SpringCloudGCPBug2158Subscriber.java +++ b/samples/snippets/src/main/java/pubsub/SpringCloudGCPBug2158Subscriber.java @@ -24,7 +24,7 @@ public class SpringCloudGCPBug2158Subscriber { public static void main(String... args) throws Exception { // TODO(developer): Set the GOOGLE_APPLICATION_CREDENTIALS to point to the service account key - //export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account-key.json + //export GOOGLE_APPLICATION_CREDENTIALS=/path/to/subscriber-service-account-key.json // TODO(developer): Replace these variables before running the sample. final String projectId = "your-project-id"; @@ -35,11 +35,11 @@ public static void main(String... args) throws Exception { createTopic(projectId, topicId); if (withOrderingEnabled) { - final String subscriptionIdWithOrdering = "mpatel-test-topic1-sub-ordered"; + final String subscriptionIdWithOrdering = "your-subscription-id-with-ordering"; createSubscription(projectId, topicId, subscriptionIdWithOrdering, true); subscribeAsync(projectId, subscriptionIdWithOrdering); } else { - String subscriptionIdNoOrdering = "mpatel-test-topic1-no-ordering"; + String subscriptionIdNoOrdering = "your-subscription-id-no-ordering"; createSubscription(projectId, topicId, subscriptionIdNoOrdering, false); subscribeAsync(projectId, subscriptionIdNoOrdering); }