8000 [BEAM-8271] Properly encode/decode StateGetRequest/Response continuat… · apache/beam@e15d33f · GitHub
[go: up one dir, main page]

Skip to content

Commit e15d33f

Browse files
committed
[BEAM-8271] Properly encode/decode StateGetRequest/Response continuation_token
Ensure proper handling of bytes vs unicode
1 parent 6463d6e commit e15d33f

File tree

2 files changed

+6
-6
lines changed

2 files changed

+6
-6
lines changed

sdks/python/apache_beam/runners/portability/fn_api_runner.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1106,7 +1106,7 @@ def __init__(self):
11061106
self._state = collections.defaultdict(list) # type: FnApiRunner.StateServicer.StateType
11071107
self._checkpoint = None # type: Optional[FnApiRunner.StateServicer.StateType]
11081108
self._use_continuation_tokens = False
1109-
self._continuations = {} # type: Dict[str, Tuple[bytes, ...]]
1109+
self._continuations = {} # type: Dict[bytes, Tuple[bytes, ...]]
11101110

11111111
def checkpoint(self):
11121112
# type: () -> None
@@ -1145,17 +1145,17 @@ def get_raw(self,
11451145
if self._use_continuation_tokens:
11461146
# The token is "nonce:index".
11471147
if not continuation_token:
1148-
token_base = 'token_%x' % len(self._continuations)
1148+
token_base = b'token_%x' % len(self._continuations)
11491149
self._continuations[token_base] = tuple(full_state)
1150-
return b'', '%s:0' % token_base
1150+
return b'', b'%s:0' % token_base
11511151
else:
1152-
token_base, index = continuation_token.split(':')
1152+
token_base, index = continuation_token.split(b':')
11531153
ix = int(index)
11541154
full_state_cont = self._continuations[token_base]
11551155
if ix == len(full_state_cont):
11561156
return b'', None
11571157
else:
1158-
return full_state_cont[ix], '%s:%d' % (token_base, ix + 1)
1158+
return full_state_cont[ix], b'%s:%d' % (token_base, ix + 1)
11591159
else:
11601160
assert not continuation_token
11611161
return b''.join(full_state), None

sdks/python/apache_beam/runners/worker/sdk_worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -842,7 +842,7 @@ def _materialize_iter(self,
842842
"""Materializes the state lazily, one element at a time.
843843
:return A generator which returns the next element if advanced.
844844
"""
845-
continuation_token = None
845+
continuation_token = None # type: Optional[bytes]
846846
while True:
847847
data, continuation_token = \
848848
self._underlying.get_raw(state_key, continuation_token)

0 commit comments

Comments
 (0)
0