1
+ # Copyright 2019 Google Inc.
2
+ #
3
+ # Licensed under the Apache License, Version 2.0 (the "License");
4
+ # you may not use this file except in compliance with the License.
5
+ # You may obtain a copy of the License at
6
+ #
7
+ # http://www.apache.org/licenses/LICENSE-2.0
8
+ #
9
+ # Unless required by applicable law or agreed to in writing, software
10
+ # distributed under the License is distributed on an "AS IS" BASIS,
11
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+ # See the License for the specific language governing permissions and
13
+ # limitations under the License.
14
+
15
+ # [START pubsub_to_gcs]
1
16
import argparse
2
17
import datetime
3
18
import json
9
24
10
25
11
26
class GroupWindowsIntoBatches (beam .PTransform ):
12
- """A composite transform that enriches Pub/Sub messages by publish
13
- timestamps and then groups them based on their window information.
27
+ """A composite transform that groups Pub/Sub messages based on publish
28
+ time and outputs a list of dictionaries, where each contains one message
29
+ and its publish timestamp.
14
30
"""
15
31
16
32
def __init__ (self , window_size ):
@@ -19,32 +35,23 @@ def __init__(self, window_size):
19
35
20
36
def expand (self , pcoll ):
21
37
return (pcoll
22
- # Assigns window info to each element in the PCollection.
38
+ # Assigns window info to each Pub/Sub message based on its
39
+ # publish timestamp.
23
40
| beam .WindowInto (window .FixedWindows (self .window_size ))
24
- # Transform each element by adding publish timestamp info.
25
41
| 'Process Pub/Sub Message' >> (beam .ParDo (AddTimestamps ()))
26
- # Use a dummy key to group all the elements in this window.
42
+ # Use a dummy key to group the elements in the same window.
27
43
| 'Add Dummy Key' >> beam .Map (lambda elem : (None , elem ))
28
44
| 'Groupby' >> beam .GroupByKey ()
29
45
| 'Abandon Dummy Key' >> beam .MapTuple (lambda _ , val : val ))
30
46
31
47
32
48
class AddTimestamps (beam .DoFn ):
33
- """A distributed processing function that acts on the elements in
34
- the PCollection.
35
- """
36
49
37
50
def process (self , element , publish_time = beam .DoFn .TimestampParam ):
38
- """Enrich each Pub/Sub message with its publish timestamp.
39
-
40
- Args:
41
- element (bytes): A Pub/Sub message.
<
10000
/tr>42
- publish_time: Default to the publish timestamp returned by the
43
- Pub/Sub server that's been bound to the message by Apache Beam
44
- at runtime.
45
-
46
- Yields:
47
- dict of str: Message body and publish timestamp.
51
+ """Processes each incoming windowed element by extracting the Pub/Sub
52
+ message body and its publish timestamp into a dictionary.
53
+ `publish_time` defaults to the publish timestamp returned by the
54
+ Pub/Sub server and is accessible by Beam at runtime.
48
55
"""
49
56
50
57
yield {
@@ -60,13 +67,7 @@ def __init__(self, output_path):
60
67
self .output_path = output_path
61
68
62
69
def process (self , batch , window = beam .DoFn .WindowParam ):
63
- """Write one batch per file to a Google Cloud Storage bucket.
64
-
65
- Args:
66
- batch (list of dict): Each dictionary contains one message and its
67
- publish timestamp.
68
- window: Window inforamtion bound to the batch.
69
- """
70
+ """Write one batch per file to a Google Cloud Storage bucket. """
70
71
71
72
ts_format = '%H:%M'
72
73
window_start = window .start .to_utc_datetime ().strftime (ts_format )
@@ -95,9 +96,8 @@ def run(argv=None):
95
96
96
97
# `save_main_session` is set to true because one or more DoFn's rely on
97
98
# globally imported modules.
98
- pipeline_options = PipelineOptions (pipeline_args ,
99
- streaming = True ,
100
- save_main_session = True )
99
+ pipeline_options = PipelineOptions (
100
+ pipeline_args , streaming = True , save_main_session = True )
101
101
102
102
with beam .Pipeline (options = pipeline_options ) as pipeline :
103
103
(pipeline
@@ -111,3 +111,4 @@ def run(argv=None):
111
111
if __name__ == '__main__' : # noqa
112
112
logging .getLogger ().setLevel (logging .INFO )
113
113
run ()
114
+ # [END pubsub_to_gcs]
0 commit comments