@@ -170,6 +170,84 @@ def delete_topic(project_id: str, topic_id: str) -> None:
170170 # [END pubsub_delete_topic]
171171
172172
173+ def pubsub_publish_otel_tracing (
174+ topic_project_id : str , trace_project_id : str , topic_id : str
175+ ) -> None :
176+ """
177+ Publish to `topic_id` in `topic_project_id` with OpenTelemetry enabled.
178+ Export the OpenTelemetry traces to Google Cloud Trace in project
179+ `trace_project_id`
180+
181+ Args:
182+ topic_project_id: project ID of the topic to publish to.
183+ trace_project_id: project ID to export Cloud Trace to.
184+ topic_id: topic ID to publish to.
185+
186+ Returns:
187+ None
188+ """
189+ # [START pubsub_publish_otel_tracing]
190+
191+ from opentelemetry import trace
192+ from opentelemetry .sdk .trace import TracerProvider
193+ from opentelemetry .sdk .trace .export import (
194+ BatchSpanProcessor ,
195+ )
196+ from opentelemetry .exporter .cloud_trace import CloudTraceSpanExporter
197+ from opentelemetry .sdk .trace .sampling import TraceIdRatioBased , ParentBased
198+
199+ from google .cloud .pubsub_v1 import PublisherClient
200+ from google .cloud .pubsub_v1 .types import PublisherOptions
201+
202+ # TODO(developer)
203+ # topic_project_id = "your-topic-project-id"
204+ # trace_project_id = "your-trace-project-id"
205+ # topic_id = "your-topic-id"
206+
207+ # In this sample, we use a Google Cloud Trace to export the OpenTelemetry
208+ # traces: https://cloud.google.com/trace/docs/setup/python-ot
209+ # Choose and configure the exporter for your set up accordingly.
210+
211+ # Sample 1 in every 1000 traces.
212+ sampler = ParentBased (root = TraceIdRatioBased (1 ))
213+ trace .set_tracer_provider (TracerProvider (sampler = sampler ))
214+
215+ # Export to Google Trace.
216+ cloud_trace_exporter = CloudTraceSpanExporter (
217+ project_id = trace_project_id ,
218+ )
219+ trace .get_tracer_provider ().add_span_processor (
220+ BatchSpanProcessor (cloud_trace_exporter )
221+ )
222+
223+ # Set the `enable_open_telemetry_tracing` option to True when creating
224+ # the publisher client. This in itself is necessary and sufficient for
225+ # the library to export OpenTelemetry traces. However, where the traces
226+ # must be exported to needs to be configured based on your OpenTelemetry
227+ # set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
228+ publisher = PublisherClient (
229+ publisher_options = PublisherOptions (
230+ enable_open_telemetry_tracing = True ,
231+ ),
232+ )
233+
234+ # The `topic_path` method creates a fully qualified identifier
235+ # in the form `projects/{project_id}/topics/{topic_id}`
236+ topic_path = publisher .topic_path (topic_project_id , topic_id )
237+ # Publish messages.
238+ for n in range (1 , 10000 ):
239+ data_str = f"Message number { n } "
240+ # Data must be a bytestring
241+ data = data_str .encode ("utf-8" )
242+ # When you publish a message, the client returns a future.
243+ future = publisher .publish (topic_path , data )
244+ print (future .result ())
245+
246+ print (f"Published messages to { topic_path } ." )
247+
248+ # [END pubsub_publish_otel_tracing]
249+
250+
173251def publish_messages (project_id : str , topic_id : str ) -> None :
174252 """Publishes multiple messages to a Pub/Sub topic."""
175253 # [START pubsub_quickstart_publisher]
@@ -522,6 +600,13 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
522600 create_parser = subparsers .add_parser ("create" , help = create_topic .__doc__ )
523601 create_parser .add_argument ("topic_id" )
524602
603+ pubsub_publish_otel_tracing_parser = subparsers .add_parser (
604+ "pubsub-publish-otel-tracing" , help = pubsub_publish_otel_tracing .__doc__
605+ )
606+ pubsub_publish_otel_tracing_parser .add_argument ("topic_project_id" )
607+ pubsub_publish_otel_tracing_parser .add_argument ("trace_project_id" )
608+ pubsub_publish_otel_tracing_parser .add_argument ("topic_id" )
609+
525610 create_topic_with_kinesis_ingestion_parser = subparsers .add_parser (
526611 "create_kinesis_ingestion" , help = create_topic_with_kinesis_ingestion .__doc__
527612 )
@@ -638,3 +723,7 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
638723 resume_publish_with_ordering_keys (args .project_id , args .topic_id )
639724 elif args .command == "detach-subscription" :
640725 detach_subscription (args .project_id , args .subscription_id )
726+ elif args .command == "pubsub-publish-otel-tracing" :
727+ pubsub_publish_otel_tracing (
728+ args .topic_project_id , args .trace_project_id , args .topic_id
729+ )
0 commit comments