8000 fixes · apache/beam@70357d5 · GitHub
[go: up one dir, main page]

Skip to content

Commit 70357d5

Browse files
committed
fixes
1 parent b768d
10000
ce commit 70357d5

File tree

11 files changed

+135
-34
lines changed

11 files changed

+135
-34
lines changed

sdks/python/apache_beam/coders/slow_stream.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import sys
2626
from builtins import chr
2727
from builtins import object
28+
from typing import List
2829

2930

3031
class OutputStream(object):
@@ -33,10 +34,11 @@ class OutputStream(object):
3334
A pure Python implementation of stream.OutputStream."""
3435

3536
def __init__(self):
36-
self.data = []
37+
self.data = [] # type: List[bytes]
3738
self.byte_count = 0
3839

3940
def write(self, b, nested=False):
41+
# type: (bytes, bool) -> None
4042
assert isinstance(b, bytes)
4143
if nested:
4244
self.write_var_int64(len(b))
@@ -48,6 +50,7 @@ def write_byte(self, val):
4850
self.byte_count += 1
4951

5052
def write_var_int64(self, v):
53+
# type: (int) -> None
5154
if v < 0:
5255
v += 1 << 64
5356
if v <= 0:
@@ -74,12 +77,15 @@ def write_bigendian_double(self, v):
7477
self.write(struct.pack('>d', v))
7578

7679
def get(self):
80+
# type: () -> bytes
7781
return b''.join(self.data)
7882

7983
def size(self):
84+
# type: () -> int
8085
return self.byte_count
8186

8287
def _clear(self):
88+
# type: () -> None
8389
self.data = []
8490
self.byte_count = 0
8591

@@ -95,6 +101,7 @@ def __init__(self):
95101
self.count = 0
96102

97103
def write(self, byte_array, nested=False):
104+
# type: (bytes, bool) -> None
98105
blen = len(byte_array)
99106
if nested:
100107
self.write_var_int64(blen)
@@ -119,6 +126,7 @@ class InputStream(object):
119126
A pure Python implementation of stream.InputStream."""
120127

121128
def __init__(self, data):
129+
# type: (bytes) -> None
122130
self.data = data
123131
self.pos = 0
124132

@@ -139,17 +147,21 @@ def size(self):
139147
return len(self.data) - self.pos
140148

141149
def read(self, size):
150+
# type: (int) -> bytes
142151
self.pos += size
143152
return self.data[self.pos - size : self.pos]
144153

145154
def read_all(self, nested):
155+
# type: (bool) -> bytes
146156
return self.read(self.read_var_int64() if nested else self.size())
147157

148158
def read_byte_py2(self):
159+
# type: () -> int
149160
self.pos += 1
150161
return ord(self.data[self.pos - 1])
151162

152163
def read_byte_py3(self):
164+
# type: () -> int
153165
self.pos += 1
154166
return self.data[self.pos - 1]
155167

sdks/python/apache_beam/options/pipeline_options.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
from typing import Dict
2929
from typing import List
3030
from typing import Optional
31+
from typing import Type
32+
from typing import TypeVar
3133

3234
from apache_beam.options.value_provider import RuntimeValueProvider
3335
from apache_beam.options.value_provider import StaticValueProvider
@@ -48,6 +50,8 @@
4850
'TestOptions',
4951
]
5052

53+
PipelineOptionsT = TypeVar('PipelineOptionsT', bound='PipelineOptions')
54+
5155

5256
def _static_value_provider_of(value_type):
5357
""""Helper function to plug a ValueProvider into argparse.
@@ -287,6 +291,7 @@ def display_data(self):
287291
return self.get_all_options(True)
288292

289293
def view_as(self, cls):
294+
# type: (Type[PipelineOptionsT]) -> PipelineOptionsT
290295
"""Returns a view of current object as provided PipelineOption subclass.
291296
292297
Example Usage::

sdks/python/apache_beam/runners/portability/fn_api_runner.py

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,7 @@ def _collect_written_timers_and_add_to_deferred_inputs(
552552
stage,
553553
get_buffer_callable,
554554
deferred_inputs # type: DefaultDict[str, _ListBuffer]
555-
):
555+
):
556556

557557
for transform_id, timer_writes in stage.timer_pcollections:
558558

@@ -588,7 +588,7 @@ def _add_residuals_and_channel_splits_to_deferred_inputs(
588588
input_for_callable,
589589
last_sent,
590590
deferred_inputs # type: DefaultDict[str, _ListBuffer]
591-
):
591+
):
592592

593593
prev_stops = {} # type: Dict[str, int]
594594
for split in splits:
@@ -1079,7 +1079,7 @@ def __init__(self,
10791079
control_handler,
10801080
data_plane_handler,
10811081
state, # type: FnApiRunner.StateServicer
1082-
provision_info # type: fn_api_runner.ExtendedProvisionInfo
1082+
provision_info # type: Optional[fn_api_runner.ExtendedProvisionInfo]
10831083
):
10841084
"""Initialize a WorkerHandler.
10851085
@@ -1120,14 +1120,23 @@ def logging_api_service_descriptor(self):
11201120
raise NotImplementedError
11211121

11221122
@classmethod
1123-
def register_environment(cls, urn, payload_type):
1123+
def register_environment(cls,
1124+
urn, # type: str
1125+
payload_type # type: Optional[Type[T]]
1126+
):
1127+
# type: (...) -> Callable[[Callable[[T, FnApiRunner.StateServicer, Optional[fn_api_runner.ExtendedProvisionInfo], GrpcServer], WorkerHandler]], Callable[[T, FnApiRunner.StateServicer, Optional[fn_api_runner.ExtendedProvisionInfo], GrpcServer], WorkerHandler]]
11241128
def wrapper(constructor):
11251129
cls._registered_environments[urn] = constructor, payload_type
11261130
return constructor
11271131
return wrapper
11281132

11291133
@classmethod
1130-
def create(cls, environment, state, provision_info, grpc_server):
1134+
def create(cls,
1135+
environment, # type: beam_runner_api_pb2.Environment
1136+
state, # type: FnApiRunner.StateServicer
1137+
provision_info, # type: Optional[fn_api_runner.ExtendedProvisionInfo]
1138+
grpc_server # type: GrpcServer
1139+
):
11311140
# type: (...) -> WorkerHandler
11321141
constructor, payload_type = cls._registered_environments[environment.urn]
11331142
return constructor(
@@ -1141,8 +1150,12 @@ def create(cls, environment, state, provision_info, grpc_server):
11411150
class EmbeddedWorkerHandler(WorkerHandler):
11421151
"""An in-memory worker_handler for fn API control, state and data planes."""
11431152

1144-
def __init__(self, unused_payload, state, provision_info,
1145-
unused_grpc_server=None):
1153+
def __init__(self,
1154+
unused_payload, # type: None
1155+
state, # type: sdk_worker.StateHandler
1156+
provision_info, # type: Optional[fn_api_runner.ExtendedProvisionInfo]
1157+
unused_grpc_server=None
1158+
):
11461159
super(EmbeddedWorkerHandler, self).__init__(
11471160
self, data_plane.InMemoryDataChannel(), state, provision_info)
11481161
self.control_conn = self # type: ignore # need Protocol to describe this
@@ -1228,7 +1241,11 @@ class GrpcServer(object):
12281241

12291242
_DEFAULT_SHUTDOWN_TIMEOUT_SECS = 5
12301243

1231-
def __init__(self, state, provision_info, max_workers):
1244+
def __init__(self,
1245+
state,
1246+
provision_info, # type: Optional[fn_api_runner.ExtendedProvisionInfo]
1247+
max_workers # type: int
1248+
):
12321249
self.state = state
12331250
self.provision_info = provision_info
12341251
self.max_workers = max_workers
@@ -1269,7 +1286,7 @@ def __init__(self, state, provision_info, max_workers):
12691286

12701287
if self.provision_info.artifact_staging_dir:
12711288
service = artifact_service.BeamFilesystemArtifactService(
1272-
self.provision_info.artifact_staging_dir)
1289+
self.provision_info.artifact_staging_dir) # type: beam_artifact_api_pb2_grpc.ArtifactRetrievalServiceServicer
12731290
else:
12741291
service = EmptyArtifactRetrievalService()
12751292
beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
@@ -1316,8 +1333,8 @@ class GrpcWorkerHandler(WorkerHandler):
13161333
"""An grpc based worker_handler for fn API control, state and data planes."""
13171334

13181335
def __init__(self,
1319-
state,
1320-
provision_info,
1336+
state, # type: FnApiRunner.StateServicer
1337+
provision_info, # type: Optional[fn_api_runner.ExtendedProvisionInfo]
13211338
grpc_server # type: GrpcServer
13221339
):
13231340
self._grpc_server = grpc_server
@@ -1363,7 +1380,12 @@ def localhost_from_worker(self):
13631380
@WorkerHandler.register_environment(
13641381
common_urns.environments.EXTERNAL.urn, beam_runner_api_pb2.ExternalPayload)
13651382
class ExternalWorkerHandler(GrpcWorkerHandler):
1366-
def __init__(self, external_payload, state, provision_info, grpc_server):
1383+
def __init__(self,
1384+
external_payload, # type: beam_runner_api_pb2.ExternalPayload
1385+
state, # type: FnApiRunner.StateServicer
1386+
provision_info, # type: Optional[fn_api_runner.ExtendedProvisionInfo]
1387+
grpc_server # type: GrpcServer
1388+
):
13671389
super(ExternalWorkerHandler, self).__init__(state, provision_info,
13681390
grpc_server)
13691391
self._external_payload = external_payload
@@ -1388,7 +1410,12 @@ def stop_worker(self):
13881410

13891411
@WorkerHandler.register_environment(python_urns.EMBEDDED_PYTHON_GRPC, bytes)
13901412
class EmbeddedGrpcWorkerHandler(GrpcWorkerHandler):
1391-
def __init__(self, num_workers_payload, state, provision_info, grpc_server):
1413+
def __init__(self,
1414+
num_workers_payload,
1415+
state, # type: FnApiRunner.StateServicer
1416+
provision_info, # type: Optional[fn_api_runner.ExtendedProvisionInfo]
1417+
grpc_server # type: GrpcServer
1418+
):
13921419
super(EmbeddedGrpcWorkerHandler, self).__init__(state, provision_info,
13931420
grpc_server)
13941421
self._num_threads = int(num_workers_payload) if num_workers_payload else 1
@@ -1413,7 +1440,12 @@ def stop_worker(self):
14131440

14141441
@WorkerHandler.register_environment(python_urns.SUBPROCESS_SDK, bytes)
14151442
class SubprocessSdkWorkerHandler(GrpcWorkerHandler):
1416-
def __init__(self, worker_command_line, state, provision_info, grpc_server):
1443+
def __init__(self,
1444+
worker_command_line, # type: bytes
1445+
state, # type: FnApiRunner.StateServicer
1446+
provision_info, # type: Optional[fn_api_runner.ExtendedProvisionInfo]
1447+
grpc_server # type: GrpcServer
1448+
):
14171449
super(SubprocessSdkWorkerHandler, self).__init__(state, provision_info,
14181450
grpc_server)
14191451
self._worker_command_line = worker_command_line
@@ -1433,7 +1465,12 @@ def stop_worker(self):
14331465
@WorkerHandler.register_environment(common_urns.environments.DOCKER.urn,
14341466
beam_runner_api_pb2.DockerPayload)
14351467
class DockerSdkWorkerHandler(GrpcWorkerHandler):
1436-
def __init__(self, payload, state, provision_info, grpc_server):
1468+
def __init__(self,
1469+
payload, # type: beam_runner_api_pb2.DockerPayload
1470+
state, # type: FnApiRunner.StateServicer
1471+
provision_info, # type: Optional[fn_api_runner.ExtendedProvisionInfo]
1472+
grpc_server # type: GrpcServer
1473+
):
14371474
super(DockerSdkWorkerHandler, self).__init__(state, provision_info,
14381475
grpc_server)
14391476
self._container_image = payload.container_image
@@ -1557,7 +1594,10 @@ def close_all(self):
15571594

15581595

15591596
class ExtendedProvisionInfo(object):
1560-
def __init__(self, provision_info=None, artifact_staging_dir=None):
1597+
def __init__(self,
1598+
provision_info=None, # type: Optional[beam_provision_api_pb2.ProvisionInfo]
1599+
artifact_staging_dir=None
1600+
):
15611601
self.provision_info = (
15621602
provision_info or beam_provision_api_pb2.ProvisionInfo())
15631603
self.artifact_staging_dir = artifact_staging_dir
@@ -1813,7 +1853,7 @@ def __init__(
18131853
def process_bundle(self,
18141854
inputs, # type: Mapping[str, _ListBuffer]
18151855
expected_outputs
1816-
):
1856+
):
18171857
# type: (...) -> Tuple[beam_fn_api_pb2.InstructionResponse, List[beam_fn_api_pb2.ProcessBundleSplitResponse]]
18181858
part_inputs = [{} for _ in range(self._num_workers)] # type: List[Dict[str, List]]
18191859
for name, input in inputs.items():

sdks/python/apache_beam/runners/portability/local_job_service.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,11 @@ class SubprocessSdkWorker(object):
191191
"""Manages a SDK worker implemented as a subprocess communicating over grpc.
192192
"""
193193

194-
def __init__(self, worker_command_line, control_address, worker_id=None):
194+
def __init__(self,
195+
worker_command_line, # type: bytes
196+
control_address,
197+
worker_id=None
198+
):
195199
self._worker_command_line = worker_command_line
196200
self._control_address = control_address
197201
self._worker_id = worker_id

0 commit comments

Comments
 (0)
0