8000 Add sample · hanv89/python-docs-samples@b0aba17 · GitHub
[go: up one dir, main page]

Skip to content

Commit b0aba17

Browse files
committed
Add sample
1 parent 661f47b commit b0aba17

File tree

3 files changed

+62
-4
lines changed

3 files changed

+62
-4
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import argparse
2+
import logging
3+
4+
import apache_beam as beam
5+
import apache_beam.transforms.window as window
6+
from apache_beam.options.pipeline_options import PipelineOptions
7+
from apache_beam.options.pipeline_options import SetupOptions
8+
from apache_beam.options.pipeline_options import StandardOptions
9+
10+
11+
def run(argv=None):
12+
parser = argparse.ArgumentParser()
13+
parser.add_argument(
14+
'--input_topic',
15+
help=('The Cloud Pub/Sub topic to read from.'
16+
'"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".'))
17+
parser.add_argument(
18+
'--window_size',
19+
type=int,
20+
default=1,
21+
help=('Output file\'s window size in number of minutes.'))
22+
parser.add_argument(
23+
'--output_path',
24+
help=('GCS Path of the output file including filename prefix.'))
25+
known_args, pipeline_args = parser.parse_known_args(argv)
26+
27+
# One or more DoFn's rely on global context.
28+
pipeline_options = PipelineOptions(pipeline_args)
29+
pipeline_options.view_as(SetupOptions).save_main_session = True
30+
pipeline_options.view_as(StandardOptions).streaming = True
31+
p = beam.Pipeline(options=pipeline_options)
32+
33+
# Read from Cloud Pub/Sub into a PCollection.
34+
if known_args.input_topic:
35+
messages = (p
36+
| 'Read Pub/Sub Messages' >> beam.io.ReadFromPubSub(
37+
topic=known_args.input_topic)
38+
.with_output_types(bytes))
39+
40+
# Group messages by fixed-sized minute intervals.
41+
transformed = (messages
42+
| beam.WindowInto(
43+
window.FixedWindows(known_args.window_size))
44+
.with_output_types(bytes))
45+
46+
# Output to GCS
47+
transformed | beam.io.WriteToText(file_path_prefix=known_args.output_path)
48+
49+
result = p.run()
50+
result.wait_until_finish()
51+
52+
if __name__ == '__main__': # noqa
53+
logging.getLogger().setLevel(logging.INFO)
54+
run()

pubsub/streaming-analytics/README.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,17 @@ The following example will run a streaming pipeline. It will read messages from
106106
+ `--output`: sets the output GCS path prefix to write files to
107107
+ `--runner [optional]`: specifies the runner to run the pipeline, defaults to `DirectRunner`
108108
+ `--windowSize [optional]`: specifies the window size in minutes, defaults to 1
109+
+ `--temp_location`: needed for execution of the pipeline
109110

110111
```bash
111-
python PubSubToGCS.py \
112+
python -m PubSubToGCS \
112113
--project=$PROJECT_NAME \
113-
--inputTopic=projects/$PROJECT_NAME/topics/cron-topic \
114-
--output=gs://$BUCKET_NAME/samples/output \
114+
--input_topic=projects/$PROJECT_NAME/topics/june \
115+
--output_path=gs://$BUCKET_NAME/labor \
115116
--runner=DataflowRunner \
116-
--windowSize=2
117+
--window_size=2 \
118+
--temp_location=gs://$BUCKET_NAME/temp \
119+
--experiments=allow_non_updateable_job
117120
```
118121

119122
After the job has been submitted, you can check its status in the [GCP Console Dataflow page].
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
google-api-python-client==1.7.9
2+
apache-beam==2.15.0

0 commit comments

Comments
 (0)
0