8000 Working example · hanv89/python-docs-samples@01c5127 · GitHub
[go: up one dir, main page]

Skip to content

Commit 01c5127

Browse files
committed
Working example
1 parent 987bee8 commit 01c5127

File tree

1 file changed

+34
-20
lines changed

1 file changed

+34
-20
lines changed

pubsub/streaming-analytics/PubSubToGCS.py

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import argparse
2+
import datetime
23
import logging
34

45
import apache_beam as beam
@@ -8,6 +9,28 @@
89
from apache_beam.options.pipeline_options import StandardOptions
910

1011

12+
class ProcessDoFn(beam.DoFn):
13+
def process(self, element, publish_time=beam.DoFn.TimestampParam):
14+
publish_time = datetime.datetime.utcfromtimestamp(
15+
float(publish_time)).strftime("%Y-%m-%d %H:%M:%S.%f")
16+
yield element.decode('utf-8') + ' published at ' + publish_time
17+
18+
19+
class CustomDoFn(beam.DoFn):
20+
def __init__(self, output_path):
21+
self.output_path = output_path
22+
23+
def process(self, batch, window=beam.DoFn.WindowParam):
24+
ts_format = '%Y-%m-%d %H:%M:%S'
25+
window_start = window.start.to_utc_datetime().strftime(ts_format)
26+
window_end = window.end.to_utc_datetime().strftime(ts_format)
27+
filename = self.output_path + ' ' + window_start + ' to ' + window_end
28+
contents = '\n'.join(str(element) for element in batch)
29+
30+
with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode='w') as f:
31+
f.write(contents.encode('utf-8'))
32+
33+
1134
def run(argv=None):
1235
parser = argparse.ArgumentParser()
1336
parser.add_argument(
@@ -28,26 +51,17 @@ def run(argv=None):
2851
pipeline_options = PipelineOptions(pipeline_args)
2952
pipeline_options.view_as(SetupOptions).save_main_session = True
3053
pipeline_options.view_as(StandardOptions).streaming = True
31-
p = beam.Pipeline(options=pipeline_options)
32-
33-
# Read from Cloud Pub/Sub into a PCollection.
34-
if known_args.input_topic:
35-
messages = (p
36-
| 'Read Pub/Sub Messages' >> beam.io.ReadFromPubSub(
37-
topic=known_args.input_topic)
38-
.with_output_types(bytes))
39-
40-
# Group messages by fixed-sized minute intervals.
41-
transformed = (messages
42-
| beam.WindowInto(
43-
window.FixedWindows(known_args.window_size))
44-
.with_output_types(bytes))
45-
46-
# Output to GCS
47-
transformed | beam.io.WriteToText(file_path_prefix=known_args.output_path)
48-
49-
result = p.run()
50-
result.wait_until_finish()
54+
55+
with beam.Pipeline(options=pipeline_options) as p:
56+
(p
57+
| 'Read Pub/Sub Messages' >> beam.io.ReadFromPubSub(
58+
topic=known_args.input_topic)
59+
| beam.WindowInto(window.FixedWindows(known_args.window_size*60))
60+
| beam.ParDo(ProcessDoFn())
61+
| beam.Map(lambda e: (None, e))
62+
| beam.GroupByKey()
63+
| beam.MapTuple(lambda k, v: v)
64+
| 'Write to GCS' >> beam.ParDo(CustomDoFn(known_args.output_path)))
5165

5266
if __name__ == '__main__': # noqa
5367
logging.getLogger().setLevel(logging.INFO)

0 commit comments

Comments
 (0)
0