8000 experiment: yield in row batches · daniel-sanche/python-bigtable@2a82343 · GitHub
[go: up one dir, main page]

8000 Skip to content

Commit 2a82343

Browse files
committed
experiment: yield in row batches
1 parent fbe298e commit 2a82343

File tree

1 file changed

+17
-24
lines changed

1 file changed

+17
-24
lines changed

google/cloud/bigtable/data/_async/_read_rows.py

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
from typing import TYPE_CHECKING, AsyncGenerator, AsyncIterable, Awaitable
1919

20+
from collections import deque
21+
2022
from google.cloud.bigtable_v2.types import ReadRowsRequest as ReadRowsRequestPB
2123
from google.cloud.bigtable_v2.types import ReadRowsResponse as ReadRowsResponsePB
2224
from google.cloud.bigtable_v2.types import RowSet as RowSetPB
@@ -39,11 +41,6 @@
3941
from google.cloud.bigtable.data._async.client import TableAsync
4042

4143

42-
class _ResetRow(Exception):
43-
def __init__(self, chunk):
44-
self.chunk = chunk
45-
46-
4744
class _ReadRowsOperationAsync:
4845
"""
4946
ReadRowsOperation handles the logic of merging chunks from a ReadRowsResponse stream
@@ -155,6 +152,7 @@ async def chunk_stream(
155152
"""
156153
process chunks out of raw read_rows stream
157154
"""
155+
q = deque()
158156
async for resp in await stream:
159157
# extract proto from proto-plus wrapper
160158
resp = resp._pb
@@ -182,14 +180,25 @@ async def chunk_stream(
182180
):
183181
raise InvalidChunk("row keys should be strictly increasing")
184182

185-
yield c
186-
187183
if c.reset_row:
184+
if c.row_key or c.HasField("family_name") or c.HasField("qualifier") or c.timestamp_micros or c.labels or c.value:
185+
raise InvalidChunk("reset row has extra fields")
186+
if not q:
187+
raise InvalidChunk("reset row with no prior chunks")
188188
current_key = None
189-
elif c.commit_row:
189+
q.clear()
190+
continue
191+
192+
q.append(c)
193+
194+
if c.commit_row:
190195
# update row state after each commit
196+
while q:
197+
yield q.popleft()
191198
self._last_yielded_row_key = current_key
192199
current_key = None
200+
if q:
201+
raise InvalidChunk("finished with incomplete row")
193202

194203
@staticmethod
195204
async def merge_rows(
@@ -222,8 +231,6 @@ async def merge_rows(
222231
try:
223232
# for each cell
224233
while True:
225-
if c.reset_row:
226-
raise _ResetRow(c)
227234
k = c.row_key
228235
f = c.family_name.value
229236
q = c.qualifier.value if c.HasField("qualifier") else None
@@ -271,8 +278,6 @@ async def merge_rows(
271278
if k and k != row_key:
272279
raise InvalidChunk("row key changed mid cell")
273280

274-
if c.reset_row:
275-
raise _ResetRow(c)
276281
buffer.append(c.value)
277282
value = b"".join(buffer)
278283
if family is None:
@@ -286,18 +291,6 @@ async def merge_rows(
286291
yield Row(row_key, cells)
287292
break
288293
c = await it.__anext__()
289-
except _ResetRow as e:
290-
c = e.chunk
291-
if (
292-
c.row_key
293-
or c.HasField("family_name")
294-
or c.HasField("qualifier")
295-
or c.timestamp_micros
296-
or c.labels
297-
or c.value
298-
):
299-
raise InvalidChunk("reset row with data")
300-
continue
301294
except StopAsyncIteration:
302295
raise InvalidChunk("premature end of stream")
303296

0 commit comments

Comments
 (0)
0