8000 Revert "[BEAM-7060] Migrate to native typing types where possible." · apache/beam@8afee95 · GitHub
[go: up one dir, main page]

Skip to content

Commit 8afee95

Browse files
authored
Revert "[BEAM-7060] Migrate to native typing types where possible."
1 parent ab80cc5 commit 8afee95

21 files changed

+135
-197
lines changed

sdks/python/apache_beam/coders/coders.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import base64
2525
import sys
26-
import typing
2726
from builtins import object
2827

2928
import google.protobuf.wrappers_pb2
@@ -598,7 +597,7 @@ def as_deterministic_coder(self, step_label, error_message=None):
598597
return DeterministicFastPrimitivesCoder(self, step_label)
599598

600599
def to_type_hint(self):
601-
return typing.Any
600+
return typehints.Any
602601

603602

604603
class DillCoder(_PickleCoderBase):
@@ -632,7 +631,7 @@ def value_coder(self):
632631
return self
633632

634633
def to_type_hint(self):
635-
return typing.Any
634+
return typehints.Any
636635

637636

638637
class FastPrimitivesCoder(FastCoder):
@@ -657,7 +656,7 @@ def as_deterministic_coder(self, step_label, error_message=None):
657656
return DeterministicFastPrimitivesCoder(self, step_label)
658657

659658
def to_type_hint(self):
660-
return typing.Any
659+
return typehints.Any
661660

662661
def as_cloud_object(self, coders_context=None, is_pair_like=True):
663662
value = super(FastCoder, self).as_cloud_object(coders_context)
@@ -1188,4 +1187,4 @@ def to_runner_api(self, context):
11881187
return self._proto
11891188

11901189
def to_type_hint(self):
1191-
return typing.Any
1190+
return typehints.Any

sdks/python/apache_beam/examples/complete/estimate_pi.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@
3333
import random
3434
from builtins import object
3535
from builtins import range
36-
from typing import Any
37-
from typing import Iterable
38-
from typing import Tuple
3936

4037
import apache_beam as beam
4138
from apache_beam.io import WriteToText
4239
from apache_beam.options.pipeline_options import PipelineOptions
4340
from apache_beam.options.pipeline_options import SetupOptions
41+
from apache_beam.typehints import Any
42+
from apache_beam.typehints import Iterable
43+
from apache_beam.typehints import Tuple
4444

4545

4646
@beam.typehints.with_output_types(Tuple[int, int, int])

sdks/python/apache_beam/examples/cookbook/group_with_coder.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import argparse
3131
import logging
3232
import sys
33-
import typing
3433
from builtins import object
3534

3635
import apache_beam as beam
@@ -39,6 +38,7 @@
3938
from apache_beam.io import WriteToText
4039
from apache_beam.options.pipeline_options import PipelineOptions
4140
from apache_beam.options.pipeline_options import SetupOptions
41+
from apache_beam.typehints import typehints
4242
from apache_beam.typehints.decorators import with_output_types
4343

4444

@@ -74,7 +74,7 @@ def is_deterministic(self):
7474
# Annotate the get_players function so that the typehint system knows that the
7575
# input to the CombinePerKey operation is a key-value pair of a Player object
7676
# and an integer.
77-
@with_output_types(typing.Tuple[Player, int])
77+
@with_output_types(typehints.KV[Player, int])
7878
def get_players(descriptor):
7979
name, points = descriptor.split(',')
8080
return Player(name), int(points)

sdks/python/apache_beam/examples/snippets/snippets_test.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import os
2727
import sys
2828
import tempfile
29-
import typing
3029
import unittest
3130
import uuid
3231
from builtins import map
@@ -322,10 +321,10 @@ def process(self, element):
322321
# One can assert outputs and apply them to transforms as well.
323322
# Helps document the contract and checks it at pipeline construction time.
324323
# [START type_hints_transform]
325-
T = typing.TypeVar('T')
324+
T = beam.typehints.TypeVariable('T')
326325

327326
@beam.typehints.with_input_types(T)
328-
@beam.typehints.with_output_types(typing.Tuple[int, T])
327+
@beam.typehints.with_output_types(beam.typehints.Tuple[int, T])
329328
class MyTransform(beam.PTransform):
330329
def expand(self, pcoll):
331330
return pcoll | beam.Map(lambda x: (len(x), x))
@@ -336,7 +335,7 @@ def expand(self, pcoll):
336335
# pylint: disable=expression-not-assigned
337336
with self.assertRaises(typehints.TypeCheckError):
338337
words_with_lens | beam.Map(lambda x: x).with_input_types(
339-
typing.Tuple[int, int])
338+
beam.typehints.Tuple[int, int])
340339

341340
def test_runtime_checks_off(self):
342341
# We do not run the following pipeline, as it has incorrect type
@@ -392,7 +391,7 @@ def parse_player_and_score(csv):
392391
lines
393392
| beam.Map(parse_player_and_score)
394393
| beam.CombinePerKey(sum).with_input_types(
395-
typing.Tuple[Player, int]))
394+
beam.typehints.Tuple[Player, int]))
396395
# [END type_hints_deterministic_key]
397396

398397
assert_that(

sdks/python/apache_beam/runners/direct/direct_runner.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import itertools
2727
import logging
2828
import time
29-
import typing
3029

3130
from google.protobuf import wrappers_pb2
3231

@@ -130,12 +129,12 @@ def visit_transform(self, applied_ptransform):
130129

131130

132131
# Type variables.
133-
K = typing.TypeVar('K')
134-
V = typing.TypeVar('V')
132+
K = typehints.TypeVariable('K')
133+
V = typehints.TypeVariable('V')
135134

136135

137-
@typehints.with_input_types(typing.Tuple[K, V])
138-
@typehints.with_output_types(typing.Tuple[K, typing.Iterable[V]])
136+
@typehints.with_input_types(typehints.KV[K, V])
137+
@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
139138
class _StreamingGroupByKeyOnly(_GroupByKeyOnly):
140139
"""Streaming GroupByKeyOnly placeholder for overriding in DirectRunner."""
141140
urn = "direct_runner:streaming_gbko:v0.1"
@@ -149,8 +148,8 @@ def from_runner_api_parameter(unused_payload, unused_context):
149148
return _StreamingGroupByKeyOnly()
150149

151150

152-
@typehints.with_input_types(typing.Tuple[K, typing.Iterable[V]])
153-
@typehints.with_output_types(typing.Tuple[K, typing.Iterable[V]])
151+
@typehints.with_input_types(typehints.KV[K, typehints.Iterable[V]])
152+
@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
154153
class _StreamingGroupAlsoByWindow(_GroupAlsoByWindow):
155154
"""Streaming GroupAlsoByWindow placeholder for overriding in DirectRunner."""
156155
urn = "direct_runner:streaming_gabw:v0.1"

sdks/python/apache_beam/runners/direct/helper_transforms.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919

2020
import collections
2121
import itertools
22-
import typing
2322

2423
import apache_beam as beam
24+
from apache_beam import typehints
2525
from apache_beam.internal.util import ArgumentPlaceholder
2626
from apache_beam.transforms.combiners import _CurriedFn
2727
from apache_beam.utils.windowed_value import WindowedValue
@@ -75,14 +75,14 @@ def finish_bundle(self):
7575

7676
def default_type_hints(self):
7777
hints = self._combine_fn.get_type_hints().copy()
78-
K = typing.TypeVar('K')
78+
K = typehints.TypeVariable('K')
7979
if hints.input_types:
8080
args, kwargs = hints.input_types
81-
args = (typing.Tuple[K, args[0]],) + args[1:]
81+
args = (typehints.Tuple[K, args[0]],) + args[1:]
8282
hints.set_input_types(*args, **kwargs)
8383
else:
84-
hints.set_input_types(typing.Tuple[K, typing.Any])
85-
hints.set_output_types(typing.Tuple[K, typing.Any])
84+
hints.set_input_types(typehints.Tuple[K, typehints.Any])
85+
hints.set_output_types(typehints.Tuple[K, typehints.Any])
8686
return hints
8787

8888

@@ -101,9 +101,9 @@ def process(self, element):
101101

102102
def default_type_hints(self):
103103
hints = self._combine_fn.get_type_hints().copy()
104-
K = typing.TypeVar('K')
105-
hints.set_input_types(typing.Tuple[K, typing.Any])
104+
K = typehints.TypeVariable('K')
105+
hints.set_input_types(typehints.Tuple[K, typehints.Any])
106106
if hints.output_types:
107107
main_output_type = hints.simple_output_type('')
108-
hints.set_output_types(typing.Tuple[K, main_output_type])
108+
hints.set_output_types(typehints.Tuple[K, main_output_type])
109109
return hints

sdks/python/apache_beam/runners/direct/transform_evaluator.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@
2323
import logging
2424
import random
2525
import time
26-
import typing
2726
from builtins import object
2827

2928
from future.utils import iteritems
3029

3130
import apache_beam.io as io
3231
from apache_beam import coders
3332
from apache_beam import pvalue
33+
from apache_beam import typehints
3434
from apache_beam.internal import pickler
3535
from apache_beam.runners import common
3636
from apache_beam.runners.common import DoFnRunner
@@ -602,11 +602,11 @@ def start_bundle(self):
602602
self.user_timer_map = {}
603603
if is_stateful_dofn(dofn):
604604
kv_type_hint = self._applied_ptransform.inputs[0].element_type
605-
if kv_type_hint and kv_type_hint != typing.Any:
605+
if kv_type_hint and kv_type_hint != typehints.Any:
606606
coder = coders.registry.get_coder(kv_type_hint)
607607
self.key_coder = coder.key_coder()
608608
else:
609-
self.key_coder = coders.registry.get_coder(typing.Any)
609+
self.key_coder = coders.registry.get_coder(typehints.Any)
610610

611611
self.user_state_context = DirectUserStateContext(
612612
self._step_context, dofn, self.key_coder)
@@ -669,7 +669,7 @@ def start_bundle(self):
669669
assert len(self._outputs) == 1
670670
self.output_pcollection = list(self._outputs)[0]
671671

672-
# The output type of a GroupByKey will be Tuple[Any, Any] or more specific.
672+
# The output type of a GroupByKey will be KV[Any, Any] or more specific.
673673
# TODO(BEAM-2717): Infer coders earlier.
674674
kv_type_hint = (
675675
self._applied_ptransform.outputs[None].element_type
@@ -759,10 +759,10 @@ def start_bundle(self):
759759
assert len(self._outputs) == 1
760760
self.output_pcollection = list(self._outputs)[0]
761761

762-
# The input type of a GroupByKey will be Tuple[Any, Any] or more specific.
762+
# The input type of a GroupByKey will be KV[Any, Any] or more specific.
763763
kv_type_hint = self._applied_ptransform.inputs[0].element_type
764764
key_type_hint = (kv_type_hint.tuple_types[0] if kv_type_hint
765-
else typing.Any)
765+
else typehints.Any)
766766
self.key_coder = coders.registry.get_coder(key_type_hint)
767767

768768
def process_element(self, element):
@@ -815,10 +815,10 @@ def start_bundle(self):
815815
self.keyed_holds = {}
816816

817817
# The input type (which is the same as the output type) of a
818-
# GroupAlsoByWindow will be Tuple[Any, Iter[Any]] or more specific.
818+
# GroupAlsoByWindow will be KV[Any, Iter[Any]] or more specific.
819819
kv_type_hint = self._applied_ptransform.outputs[None].element_type
820820
key_type_hint = (kv_type_hint.tuple_types[0] if kv_type_hint
821-
else typing.Any)
821+
else typehints.Any)
822822
self.key_coder = coders.registry.get_coder(key_type_hint)
823823

824824
def process_element(self, element):

sdks/python/apache_beam/runners/pipeline_context.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
from apache_beam.portability.api import beam_fn_api_pb2
3232
from apache_beam.portability.api import beam_runner_api_pb2
3333
from apache_beam.transforms import core
34-
from apache_beam.typehints import native_type_compatibility
3534

3635

3736
class Environment(object):
@@ -165,8 +164,7 @@ def element_type_from_coder_id(self, coder_id):
165164
if self.use_fake_coders or coder_id not in self.coders:
166165
return pickler.loads(coder_id)
167166
else:
168-
return native_type_compatibility.convert_to_beam_type(
169-
self.coders[coder_id].to_type_hint())
167+
return self.coders[coder_id].to_type_hint()
170168

171169
@staticmethod
172170
def from_runner_api(proto):

sdks/python/apache_beam/runners/portability/fn_api_runner_test.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import threading
2727
import time
2828
import traceback
29-
import typing
3029
import unittest
3130
import uuid
3231
from builtins import range
@@ -258,10 +257,10 @@ def test_multimap_side_input_type_coercion(self):
258257
with self.create_pipeline() as p:
259258
main = p | 'main' >> beam.Create(['a', 'b'])
260259
# The type of this side-input is forced to Any (overriding type
261-
# inference). Without type coercion to Tuple[Any, Any], the usage of this
260+
# inference). Without type coercion to KV[Any, Any], the usage of this
262261
# side-input in AsMultiMap() below should fail.
263262
side = (p | 'side' >> beam.Create([('a', 1), ('b', 2), ('a', 3)])
264-
.with_output_types(typing.Any))
263+
.with_output_types(beam.typehints.Any))
265264
assert_that(
266265
main | beam.Map(lambda k, d: (k, sorted(d[k])),
267266
beam.pvalue.AsMultiMap(side)),

sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -763,14 +763,14 @@ def make_stage(base_stage, transform_id, extra_must_follow=()):
763763
main_input_id = transform.inputs[main_input_tag]
764764
element_coder_id = context.components.pcollections[
765765
main_input_id].coder_id
766-
# Tuple[element, restriction]
766+
# KV[element, restriction]
767767
paired_coder_id = context.add_or_get_coder_id(
768768
beam_runner_api_pb2.Coder(
769769
spec=beam_runner_api_pb2.FunctionSpec(
770770
urn=common_urns.coders.KV.urn),
771771
component_coder_ids=[element_coder_id,
772772
pardo_payload.restriction_coder_id]))
773-
# Tuple[Tuple[element, restriction], double]
773+
# KV[KV[element, restriction], double]
774774
sized_coder_id = context.add_or_get_coder_id(
775775
beam_runner_api_pb2.Coder(
776776
spec=beam_runner_api_pb2.FunctionSpec(

0 commit comments

Comments
 (0)
0