17
17
18
18
from typing import TYPE_CHECKING , AsyncGenerator , AsyncIterable , Awaitable
19
19
20
+ from collections import deque
21
+
20
22
from google .cloud .bigtable_v2 .types import ReadRowsRequest as ReadRowsRequestPB
21
23
from google .cloud .bigtable_v2 .types import ReadRowsResponse as ReadRowsResponsePB
22
24
from google .cloud .bigtable_v2 .types import RowSet as RowSetPB
39
41
from google .cloud .bigtable .data ._async .client import TableAsync
40
42
41
43
42
- class _ResetRow (Exception ):
43
- def __init__ (self , chunk ):
44
- self .chunk = chunk
45
-
46
-
47
44
class _ReadRowsOperationAsync :
48
45
"""
49
46
ReadRowsOperation handles the logic of merging chunks from a ReadRowsResponse stream
@@ -155,6 +152,7 @@ async def chunk_stream(
155
152
"""
156
153
process chunks out of raw read_rows stream
157
154
"""
155
+ q = deque ()
158
156
async for resp in await stream :
159
157
# extract proto from proto-plus wrapper
160
158
resp = resp ._pb
@@ -182,14 +180,25 @@ async def chunk_stream(
182
180
):
183
181
raise InvalidChunk ("row keys should be strictly increasing" )
184
182
185
- yield c
186
-
187
183
if c .reset_row :
184
+ if c .row_key or c .HasField ("family_name" ) or c .HasField ("qualifier" ) or c .timestamp_micros or c .labels or c .value :
185
+ raise InvalidChunk ("reset row has extra fields" )
186
+ if not q :
187
+ raise InvalidChunk ("reset row with no prior chunks" )
188
188
current_key = None
189
- elif c .commit_row :
189
+ q .clear ()
190
+ continue
191
+
192
+ q .append (c )
193
+
194
+ if c .commit_row :
190
195
# update row state after each commit
196
+ while q :
197
+ yield q .popleft ()
191
198
self ._last_yielded_row_key = current_key
192
199
current_key = None
200
+ if q :
201
+ raise InvalidChunk ("finished with incomplete row" )
193
202
194
203
@staticmethod
195
204
async def merge_rows (
@@ -222,8 +231,6 @@ async def merge_rows(
222
231
try :
223
232
# for each cell
224
233
while True :
225
- if c .reset_row :
226
- raise _ResetRow (c )
227
234
k = c .row_key
228
235
f = c .family_name .value
229
236
q = c .qualifier .value if c .HasField ("qualifier" ) else None
@@ -271,8 +278,6 @@ async def merge_rows(
271
278
if k and k != row_key :
272
279
raise InvalidChunk ("row key changed mid cell" )
273
280
274
- if c .reset_row :
275
- raise _ResetRow (c )
276
281
buffer .append (c .value )
277
282
value = b"" .join (buffer )
278
283
if family is None :
@@ -286,18 +291,6 @@ async def merge_rows(
286
291
yield Row (row_key , cells )
287
292
break
288
293
c = await it .__anext__ ()
289
- except _ResetRow as e :
290
- c = e .chunk
291
- if (
292
- c .row_key
293
- or c .HasField ("family_name" )
294
- or c .HasField ("qualifier" )
295
- or c .timestamp_micros
296
- or c .labels
297
- or c .value
298
- ):
299
- raise InvalidChunk ("reset row with data" )
300
- continue
301
294
except StopAsyncIteration :
302
295
raise InvalidChunk ("premature end of stream" )
303
296
0 commit comments