@@ -998,7 +998,7 @@ def expand(self, pcoll):
998
998
# type: (pvalue.PCollection[InT]) -> pvalue.PCollection[None]
999
999
do_once = pcoll .pipeline | 'DoOnce' >> core .Create ([None ])
1000
1000
init_result_coll = do_once | 'InitializeWrite' >> core .Map (
1001
- lambda _ , sink : sink .initialize_write (), self . sink )
1001
+ lambda _ , sink = self . sink : sink .initialize_write ())
1002
1002
if getattr (self .sink , 'num_shards' , 0 ):
1003
1003
min_shards = self .sink .num_shards
1004
1004
if min_shards == 1 :
@@ -1029,9 +1029,9 @@ def getitem(x):
1029
1029
# fused.
1030
1030
pre_finalize_coll = do_once | 'PreFinalize' >> core .FlatMap (
1031
1031
_pre_finalize ,
1032
- self .sink ,
1033
- AsSingleton (init_result_coll ),
1034
- AsIter (write_result_coll ))
1032
+ sink = self .sink ,
1033
+ init_result = AsSingleton (init_result_coll ),
1034
+ write_results = AsIter (write_result_coll ))
1035
1035
return do_once | 'FinalizeWrite' >> core .FlatMap (
1036
1036
_finalize_write ,
1037
1037
self .sink ,
@@ -1084,12 +1084,17 @@ def process(self, element, init_result):
1084
1084
return [window .TimestampedValue (writer .close (), timestamp .MAX_TIMESTAMP )]
1085
1085
1086
1086
1087
- def _pre_finalize (unused_element , sink , init_result , write_results ):
1087
+ # FIXME: when we migrate to python3-only, change this to:
1088
+ # def _pre_finalize(unused_element, *, sink, init_result, write_results):
1089
+ def _pre_finalize (unused_element , sink = None , init_result = None ,
1090
+ write_results = None ):
1088
1091
return sink .pre_finalize (init_result , write_results )
1089
1092
1090
-
1091
- def _finalize_write (unused_element , sink , init_result , write_results ,
1092
- min_shards , pre_finalize_results ):
1093
+ # FIXME: when we migrate to python3-only, change this to:
1094
+ # def _finalize_write(unused_element, *, sink, init_result, ...):
1095
+ def _finalize_write (unused_element , sink = None , init_result = None ,
1096
+ write_results = None , min_shards = None ,
1097
+ pre_finalize_results = None ):
1093
1098
write_results = list (write_results )
1094
1099
extra_shards = []
1095
1100
if len (write_results ) < min_shards :
0 commit comments