8000 nits: license header, docstring, indentation · hanv89/python-docs-samples@b67e182 · GitHub
[go: up one dir, main page]

Skip to content

Commit b67e182

Browse files
committed
nits: license header, docstring, indentation
1 parent b20d28d commit b67e182

File tree

1 file changed

+29
-28
lines changed

1 file changed

+29
-28
lines changed

pubsub/streaming-analytics/PubSubToGCS.py

Lines changed: 29 additions & 28 deletions
< 10000 /tr>
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
# Copyright 2019 Google Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# [START pubsub_to_gcs]
116
import argparse
217
import datetime
318
import json
@@ -9,8 +24,9 @@
924

1025

1126
class GroupWindowsIntoBatches(beam.PTransform):
12-
"""A composite transform that enriches Pub/Sub messages by publish
13-
timestamps and then groups them based on their window information.
27+
"""A composite transform that groups Pub/Sub messages based on publish
28+
time and outputs a list of dictionaries, where each contains one message
29+
and its publish timestamp.
1430
"""
1531

1632
def __init__(self, window_size):
@@ -19,32 +35,23 @@ def __init__(self, window_size):
1935

2036
def expand(self, pcoll):
2137
return (pcoll
22-
# Assigns window info to each element in the PCollection.
38+
# Assigns window info to each Pub/Sub message based on its
39+
# publish timestamp.
2340
| beam.WindowInto(window.FixedWindows(self.window_size))
24-
# Transform each element by adding publish timestamp info.
2541
| 'Process Pub/Sub Message' >> (beam.ParDo(AddTimestamps()))
26-
# Use a dummy key to group all the elements in this window.
42+
# Use a dummy key to group the elements in the same window.
2743
| 'Add Dummy Key' >> beam.Map(lambda elem: (None, elem))
2844
| 'Groupby' >> beam.GroupByKey()
2945
| 'Abandon Dummy Key' >> beam.MapTuple(lambda _, val: val))
3046

3147

3248
class AddTimestamps(beam.DoFn):
33-
"""A distributed processing function that acts on the elements in
34-
the PCollection.
35-
"""
3649

3750
def process(self, element, publish_time=beam.DoFn.TimestampParam):
38-
"""Enrich each Pub/Sub message with its publish timestamp.
39-
40-
Args:
41-
element (bytes): A Pub/Sub message.
42-
publish_time: Default to the publish timestamp returned by the
43-
Pub/Sub server that's been bound to the message by Apache Beam
44-
at runtime.
45-
46-
Yields:
47-
dict of str: Message body and publish timestamp.
51+
"""Processes each incoming windowed element by extracting the Pub/Sub
52+
message body and its publish timestamp into a dictionary.
53+
`publish_time` defaults to the publish timestamp returned by the
54+
Pub/Sub server and is accessible by Beam at runtime.
4855
"""
4956

5057
yield {
@@ -60,13 +67,7 @@ def __init__(self, output_path):
6067
self.output_path = output_path
6168

6269
def process(self, batch, window=beam.DoFn.WindowParam):
63-
"""Write one batch per file to a Google Cloud Storage bucket.
64-
65-
Args:
66-
batch (list of dict): Each dictionary contains one message and its
67-
publish timestamp.
68-
window: Window inforamtion bound to the batch.
69-
"""
70+
"""Write one batch per file to a Google Cloud Storage bucket. """
7071

7172
ts_format = '%H:%M'
7273
window_start = window.start.to_utc_datetime().strftime(ts_format)
@@ -95,9 +96,8 @@ def run(argv=None):
9596

9697
# `save_main_session` is set to true because one or more DoFn's rely on
9798
# globally imported modules.
98-
pipeline_options = PipelineOptions(pipeline_args,
99-
streaming=True,
100-
save_main_session=True)
99+
pipeline_options = PipelineOptions(
100+
pipeline_args, streaming=True, save_main_session=True)
101101

102102
with beam.Pipeline(options=pipeline_options) as pipeline:
103103
(pipeline
@@ -111,3 +111,4 @@ def run(argv=None):
111111
if __name__ == '__main__': # noqa
112112
logging.getLogger().setLevel(logging.INFO)
113113
run()
114+
# [END pubsub_to_gcs]

0 commit comments

Comments
 (0)
0