8000 fix: resume iterator on EOS internal error (#122) · googleapis/python-spanner@45a1538 · GitHub
[go: up one dir, main page]

Skip to content

Commit 45a1538

Browse files
larkeetseaver
andauthored
fix: resume iterator on EOS internal error (#122)
* fix: resume iterator on EOS internal error * fix: add additional stream resumption message * test: add unit tests * Apply suggestions from code review Co-authored-by: Tres Seaver <tseaver@palladion.com> Co-authored-by: larkee <larkee@users.noreply.github.com> Co-authored-by: Tres Seaver <tseaver@palladion.com>
1 parent 808837b commit 45a1538

File tree

2 files changed

+151
-7
lines changed

2 files changed

+151
-7
lines changed

google/cloud/spanner_v1/snapshot.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionOptions
2121
from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionSelector
2222

23+
from google.api_core.exceptions import InternalServerError
2324
from google.api_core.exceptions import ServiceUnavailable
2425
import google.api_core.gapic_v1.method
2526
from google.cloud._helpers import _datetime_to_pb_timestamp
@@ -32,6 +33,11 @@
3233
from google.cloud.spanner_v1.types import PartitionOptions
3334
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
3435

36+
_STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = (
37+
"RST_STREAM",
38+
"Received unexpected EOS on DATA frame from server",
39+
)
40+
3541

3642
def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=None):
3743
"""Restart iteration after :exc:`.ServiceUnavailable`.
@@ -55,6 +61,17 @@ def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=N
5561
with trace_call(trace_name, session, attributes):
5662
iterator = restart(resume_token=resume_token)
5763
continue
64+
except InternalServerError as exc:
65+
resumable_error = any(
66+
resumable_message in exc.message
67+
for resumable_message in _STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES
68+
)
69+
if not resumable_error:
70+
raise
71+
del item_buffer[:]
72+
with trace_call(trace_name, session, attributes):
73+
iterator = restart(resume_token=resume_token)
74+
continue
5875

5976
if len(item_buffer) == 0:
6077
break

tests/unit/test_snapshot.py

Lines changed: 134 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,24 +86,68 @@ def test_iteration_w_raw_w_resume_tken(self):
8686
self.assertNoSpans()
8787

8888
def test_iteration_w_raw_raising_unavailable_no_token(self):
89+
from google.api_core.exceptions import ServiceUnavailable
90+
91+
ITEMS = (
92+
self._make_item(0),
93+
self._make_item(1, resume_token=RESUME_TOKEN),
94+
self._make_item(2),
95+
)
96+
before = _MockIterator(fail_after=True, error=ServiceUnavailable("testing"))
97+
after = _MockIterator(*ITEMS)
98+
restart = mock.Mock(spec=[], side_effect=[before, after])
99+
resumable = self._call_fut(restart)
100+
self.assertEqual(list(resumable), list(ITEMS))
101+
self.assertEqual(restart.mock_calls, [mock.call(), mock.call(resume_token=b"")])
102+
self.assertNoSpans()
103+
104+
def test_iteration_w_raw_raising_retryable_internal_error_no_token(self):
105+
from google.api_core.exceptions import InternalServerError
106+
89107
ITEMS = (
90108
self._make_item(0),
91109
self._make_item(1, resume_token=RESUME_TOKEN),
92110
self._make_item(2),
93111
)
94-
before = _MockIterator(fail_after=True)
112+
before = _MockIterator(
113+
fail_after=True,
114+
error=InternalServerError(
115+
"Received unexpected EOS on DATA frame from server"
116+
),
117+
)
95118
after = _MockIterator(*ITEMS)
96119
restart = mock.Mock(spec=[], side_effect=[before, after])
97120
resumable = self._call_fut(restart)
98121
self.assertEqual(list(resumable), list(ITEMS))
99122
self.assertEqual(restart.mock_calls, [mock.call(), mock.call(resume_token=b"")])
100123
self.assertNoSpans()
101124

125+
def test_iteration_w_raw_raising_non_retryable_internal_error_no_token(self):
126+
from google.api_core.exceptions import InternalServerError
127+
128+
ITEMS = (
129+
self._make_item(0),
130+
self._make_item(1, resume_token=RESUME_TOKEN),
131+
self._make_item(2),
132+
)
133+
before = _MockIterator(fail_after=True, error=InternalServerError("testing"))
134+
after = _MockIterator(*ITEMS)
135+
restart = mock.Mock(spec=[], side_effect=[before, after])
136+
resumable = self._call_fut(restart)
137+
with self.assertRaises(InternalServerError):
138+
list(resumable)
139+
self.assertEqual(restart.mock_calls, [mock.call()])
140+
self.assertNoSpans()
141+
102142
def test_iteration_w_raw_raising_unavailable(self):
143+
from google.api_core.exceptions import ServiceUnavailable
144+
103145
FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN))
104146
SECOND = (self._make_item(2),) # discarded after 503
105147
LAST = (self._make_item(3),)
106-
before = _MockIterator(*(FIRST + SECOND), fail_after=True)
148+
before = _MockIterator(
149+
*(FIRST + SECOND), fail_after=True, error=ServiceUnavailable("testing")
150+
)
107151
after = _MockIterator(*LAST)
108152
restart = mock.Mock(spec=[], side_effect=[before, after])
109153
resumable = self._call_fut(restart)
@@ -113,10 +157,53 @@ def test_iteration_w_raw_raising_unavailable(self):
113157
)
114158
self.assertNoSpans()
115159

160+
def test_iteration_w_raw_raising_retryable_internal_error(self):
161+
from google.api_core.exceptions import InternalServerError
162+
163+
FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN))
164+
SECOND = (self._make_item(2),) # discarded after 503
165+
LAST = (self._make_item(3),)
166+
before = _MockIterator(
167+
*(FIRST + SECOND),
168+
fail_after=True,
169+
error=InternalServerError(
170+
"Received unexpected EOS on DATA frame from server"
171+
)
172+
)
173+
after = _MockIterator(*LAST)
174+
restart = mock.Mock(spec=[], side_effect=[before, after])
175+
resumable = self._call_fut(restart)
176+
self.assertEqual(list(resumable), list(FIRST + LAST))
177+
self.assertEqual(
178+
restart.mock_calls, [mock.call(), mock.call(resume_token=RESUME_TOKEN)]
179+
)
180+
self.assertNoSpans()
181+
182+
def test_iteration_w_raw_raising_non_retryable_internal_error(self):
183+
from google.api_core.exceptions import InternalServerError
184+
185+
FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN))
186+
SECOND = (self._make_item(2),) # discarded after 503
187+
LAST = (self._make_item(3),)
188+
before = _MockIterator(
189+
*(FIRST + SECOND), fail_after=True, error=InternalServerError("testing")
190+
)
191+
after = _MockIterator(*LAST)
192+
restart = mock.Mock(spec=[], side_effect=[before, after])
193+
resumable = self._call_fut(restart)
194+
with self.assertRaises(InternalServerError):
195+
list(resumable)
196+
self.assertEqual(restart.mock_calls, [mock.call()])
197+
self.assertNoSpans()
198+
116199
def test_iteration_w_raw_raising_unavailable_after_token(self):
200+
from google.api_core.exceptions import ServiceUnavailable
201+
117202
FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN))
118203
SECOND = (self._make_item(2), self._make_item(3))
119-
before = _MockIterator(*FIRST, fail_after=True)
204+
before = _MockIterator(
205+
*FIRST, fail_after=True, error=ServiceUnavailable("testing")
206+
)
120207
after = _MockIterator(*SECOND)
121208
restart = mock.Mock(spec=[], side_effect=[before, after])
122209
resumable = self._call_fut(restart)
@@ -126,6 +213,43 @@ def test_iteration_w_raw_raising_unavailable_after_token(self):
126213
)
127214
self.assertNoSpans()
128215

216+
def test_iteration_w_raw_raising_retryable_internal_error_after_token(self):
217+
from google.api_core.exceptions import InternalServerError
218+
219+
FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN))
220+
SECOND = (self._make_item(2), self._make_item(3))
221+
before = _MockIterator(
222+
*FIRST,
223+
fail_after=True,
224+
error=InternalServerError(
225+
"Received unexpected EOS on DATA frame from server"
226+
)
227+
)
228+
after = _MockIterator(*SECOND)
229+
restart = mock.Mock(spec=[], side_effect=[before, after])
230+
resumable = self._call_fut(restart)
231+
self.assertEqual(list(resumable), list(FIRST + SECOND))
232+
self.assertEqual(
233+
restart.mock_calls, [mock.call(), mock.call(resume_token=RESUME_TOKEN)]
234+
)
235+
self.assertNoSpans()
236+
237+
def test_iteration_w_raw_raising_non_retryable_internal_error_after_token(self):
238+
from google.api_core.exceptions import InternalServerError
239+
240+
FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN))
241+
SECOND = (self._make_item(2), self._make_item(3))
< 10000 /td>
242+
before = _MockIterator(
243+
*FIRST, fail_after=True, error=InternalServerError("testing")
244+
)
245+
after = _MockIterator(*SECOND)
246+
restart = mock.Mock(spec=[], side_effect=[before, after])
247+
resumable = self._call_fut(restart)
248+
with self.assertRaises(InternalServerError):
249+
list(resumable)
250+
self.assertEqual(restart.mock_calls, [mock.call()])
251+
self.assertNoSpans()
252+
129253
def test_iteration_w_span_creation(self):
130254
name = "TestSpan"
131255
extra_atts = {"test_att": 1}
@@ -136,11 +260,15 @@ def test_iteration_w_span_creation(self):
136260
self.assertSpanAttributes(name, attributes=dict(BASE_ATTRIBUTES, test_att=1))
137261

138262
def test_iteration_w_multiple_span_creation(self):
263+
from google.api_core.exceptions import ServiceUnavailable
264+
139265
if HAS_OPENTELEMETRY_INSTALLED:
140266
FIRST = (self._make_item(0), self._make_item(1, resume_token=RESUME_TOKEN))
141267
SECOND = (self._make_item(2),) # discarded after 503
142268
LAST = (self._make_item(3),)
143-
before = _MockIterator(*(FIRST + SECOND), fail_after=True)
269+
before = _MockIterator(
270+
*(FIRST + SECOND), fail_after=True, error=ServiceUnavailable("testing")
271+
)
144272
after = _MockIterator(*LAST)
145273
restart = mock.Mock(spec=[], side_effect=[before, after])
146274
name = "TestSpan"
@@ -1153,18 +1281,17 @@ class _MockIterator(object):
11531281
def __init__(self, *values, **kw):
11541282
self._iter_values = iter(values)
11551283
self._fail_after = kw.pop("fail_after", False)
1284+
self._error = kw.pop("error", Exception)
11561285

11571286
def __iter__(self):
11581287
return self
11591288

11601289
def __next__(self):
1161-
from google.api_core.exceptions import ServiceUnavailable
1162-
11631290
try:
11641291
return next(self._iter_values)
11651292
except StopIteration:
11661293
if self._fail_after:
1167-
raise ServiceUnavailable("testing")
1294+
raise self._error
11681295
raise
11691296

11701297
next = __next__

0 commit comments

Comments
 (0)
0