8000 Fix incompatible with super type · apache/beam@e3a5471 · GitHub
[go: up one dir, main page]

Skip to content

Commit e3a5471

Browse files
committed
Fix incompatible with super type
1 parent 646931b commit e3a5471

File tree

2 files changed

+22
-17
lines changed

2 files changed

+22
-17
lines changed

sdks/python/apache_beam/io/iobase.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -998,7 +998,7 @@ def expand(self, pcoll):
998998
# type: (pvalue.PCollection[InT]) -> pvalue.PCollection[None]
999999
do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None])
10001000
init_result_coll = do_once | 'InitializeWrite' >> core.Map(
1001-
lambda _, sink: sink.initialize_write(), self.sink)
1001+
lambda _, sink=self.sink: sink.initialize_write())
10021002
if getattr(self.sink, 'num_shards', 0):
10031003
min_shards = self.sink.num_shards
10041004
if min_shards == 1:
@@ -1029,9 +1029,9 @@ def getitem(x):
10291029
# fused.
10301030
pre_finalize_coll = do_once | 'PreFinalize' >> core.FlatMap(
10311031
_pre_finalize,
1032-
self.sink,
1033-
AsSingleton(init_result_coll),
1034-
AsIter(write_result_coll))
1032+
sink=self.sink,
1033+
init_result=AsSingleton(init_result_coll),
1034+
write_results=AsIter(write_result_coll))
10351035
return do_once | 'FinalizeWrite' >> core.FlatMap(
10361036
_finalize_write,
10371037
self.sink,
@@ -1084,12 +1084,17 @@ def process(self, element, init_result):
10841084
return [window.TimestampedValue(writer.close(), timestamp.MAX_TIMESTAMP)]
10851085

10861086

1087-
def _pre_finalize(unused_element, sink, init_result, write_results):
1087+
# FIXME: when we migrate to python3-only, change this to:
1088+
# def _pre_finalize(unused_element, *, sink, init_result, write_results):
1089+
def _pre_finalize(unused_element, sink=None, init_result=None,
1090+
write_results=None):
10881091
return sink.pre_finalize(init_result, write_results)
10891092

1090-
1091-
def _finalize_write(unused_element, sink, init_result, write_results,
1092-
min_shards, pre_finalize_results):
1093+
# FIXME: when we migrate to python3-only, change this to:
1094+
# def _finalize_write(unused_element, *, sink, init_result, ...):
1095+
def _finalize_write(unused_element, sink=None, init_result=None,
1096+
write_results=None, min_shards=None,
1097+
pre_finalize_results=None):
10931098
write_results = list(write_results)
10941099
extra_shards = []
10951100
if len(write_results) < min_shards:

sdks/python/apache_beam/transforms/combiners.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -401,9 +401,9 @@ def __init__(self, n, less_than, key):
401401
self._less_than = None if less_than is operator.lt else less_than
402402
self._key = key
403403

404-
def process(self, key_and_bundles):
405-
# type: (Tuple[None, Iterable[List[T]]]) -> Iterator[List[T]]
406-
_, bundles = key_and_bundles
404+
def process(self, element, *args, **kwargs):
405+
# type: (Tuple[None, Iterable[List[T]]], *Any, **Any) -> Iterator[List[T]]
406+
_, bundles = element
407407

408408
def push(hp, e):
409409
if len(hp) < self._n:
@@ -422,12 +422,12 @@ def push(hp, e):
422422
for bundle in bundles:
423423
if not heapc:
424424
heapc = [
425-
cy_combiners.ComparableValue(element, self._less_than, self._key)
426-
for element in bundle]
425+
cy_combiners.ComparableValue(item, self._less_than, self._key)
426+
for item in bundle]
427427
continue
428-
for element in reversed(bundle):
428+
for item in reversed(bundle):
429429
if push(heapc, cy_combiners.ComparableValue(
430-
element, self._less_than, self._key)):
430+
item, self._less_than, self._key)):
431431
break
432432
heapc.sort()
433433
yield [wrapper.value for wrapper in reversed(heapc)]
@@ -438,8 +438,8 @@ def push(hp, e):
438438
if not heap:
439439
heap = bundle
440440
continue
441-
for element in reversed(bundle):
442-
if push(heap, element):
441+
for item in reversed(bundle):
442+
if push(heap, item):
443443
break
444444
heap.sort()
445445
yield heap[::-1]

0 commit comments

Comments
 (0)
0