1
1
import argparse
2
+ import datetime
2
3
import logging
3
4
4
5
import apache_beam as beam
8
9
from apache_beam .options .pipeline_options import StandardOptions
9
10
10
11
12
+ class ProcessDoFn (beam .DoFn ):
13
+ def process (self , element , publish_time = beam .DoFn .TimestampParam ):
14
+ publish_time = datetime .datetime .utcfromtimestamp (
15
+ float (publish_time )).strftime ("%Y-%m-%d %H:%M:%S.%f" )
16
+ yield element .decode ('utf-8' ) + ' published at ' + publish_time
17
+
18
+
19
+ class CustomDoFn (beam .DoFn ):
20
+ def __init__ (self , output_path ):
21
+ self .output_path = output_path
22
+
23
+ def process (self , batch , window = beam .DoFn .WindowParam ):
24
+ ts_format = '%Y-%m-%d %H:%M:%S'
25
+ window_start = window .start .to_utc_datetime ().strftime (ts_format )
26
+ window_end = window .end .to_utc_datetime ().strftime (ts_format )
27
+ filename = self .output_path + ' ' + window_start + ' to ' + window_end
28
+ contents = '\n ' .join (str (element ) for element in batch )
29
+
30
+ with beam .io .gcp .gcsio .GcsIO ().open (filename = filename , mode = 'w' ) as f :
31
+ f .write (contents .encode ('utf-8' ))
32
+
33
+
11
34
def run (argv = None ):
12
35
parser = argparse .ArgumentParser ()
13
36
parser .add_argument (
@@ -28,26 +51,17 @@ def run(argv=None):
28
51
pipeline_options = PipelineOptions (pipeline_args )
29
52
pipeline_options .view_as (SetupOptions ).save_main_session = True
30
53
pipeline_options .view_as (StandardOptions ).streaming = True
31
- p = beam .Pipeline (options = pipeline_options )
32
-
33
- # Read from Cloud Pub/Sub into a PCollection.
34
- if known_args .input_topic :
35
- messages = (p
36
- | 'Read Pub/Sub Messages' >> beam .io .ReadFromPubSub (
37
- topic = known_args .input_topic )
38
- .with_output_types (bytes ))
39
-
40
- # Group messages by fixed-sized minute intervals.
41
- transformed = (messages
42
- | beam .WindowInto (
43
- window .FixedWindows (known_args .window_size ))
44
- .with_output_types (bytes ))
45
-
46
- # Output to GCS
47
- transformed | beam .io .WriteToText (file_path_prefix = known_args .output_path )
48
-
49
- result = p .run ()
50
- result .wait_until_finish ()
54
+
55
+ with beam .Pipeline (options = pipeline_options ) as p :
56
+ (p
57
+ | 'Read Pub/Sub Messages' >> beam .io .ReadFromPubSub (
58
+ topic = known_args .input_topic )
59
+ | beam .WindowInto (window .FixedWindows (known_args .window_size * 60 ))
60
+ | beam .ParDo (ProcessDoFn ())
61
+ | beam .Map (lambda e : (None , e ))
62
+ | beam .GroupByKey ()
63
+ | beam .MapTuple (lambda k , v : v )
64
+ | 'Write to GCS' >> beam .ParDo (CustomDoFn (known_args .output_path )))
51
65
52
66
if __name__ == '__main__' : # noqa
53
67
logging .getLogger ().setLevel (logging .INFO )
0 commit comments