-
Notifications
You must be signed in to change notification settings - Fork 31
feat: support aborted transactions internal retry #544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
50580b4
481db2a
313e16d
3a7537c
4edf6c1
f87129a
49e17be
a8158b3
ccf5385
a537628
0b1a641
0e2ca3e
a4ffab5
fc890c2
d204836
9ea2a01
7e70d86
870e170
450b91b
578eaa2
59d597a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
# Copyright 2020 Google LLC | ||
# | ||
# Use of this source code is governed by a BSD-style | ||
# license that can be found in the LICENSE file or at | ||
# https://developers.google.com/open-source/licenses/bsd | ||
|
||
"""API to calculate checksums of SQL statements results.""" | ||
|
||
import hashlib | ||
import pickle | ||
|
||
from google.cloud.spanner_dbapi.exceptions import RetryAborted | ||
|
||
|
||
class ResultsChecksum: | ||
"""Cumulative checksum. | ||
|
||
Used to calculate a total checksum of all the results | ||
returned by operations executed within transaction. | ||
Includes methods for checksums comparison. | ||
These checksums are used while retrying an aborted | ||
transaction to check if the results of a retried transaction | ||
are equal to the results of the original transaction. | ||
""" | ||
|
||
def __init__(self): | ||
self.checksum = hashlib.sha256() | ||
self.count = 0 # counter of consumed results | ||
|
||
def __len__(self): | ||
"""Return the number of consumed results. | ||
|
||
:rtype: :class:`int` | ||
:returns: The number of results. | ||
""" | ||
return self.count | ||
|
||
def __eq__(self, other): | ||
"""Check if checksums are equal. | ||
|
||
:type other: :class:`google.cloud.spanner_dbapi.checksum.ResultsChecksum` | ||
:param other: Another checksum to compare with this one. | ||
""" | ||
return self.checksum.digest() == other.checksum.digest() | ||
|
||
def consume_result(self, result): | ||
"""Add the given result into the checksum. | ||
|
||
:type result: Union[int, list] | ||
:param result: Streamed row or row count from an UPDATE operation. | ||
""" | ||
self.checksum.update(pickle.dumps(result)) | ||
self.count += 1 | ||
|
||
|
||
def _compare_checksums(original, retried): | ||
"""Compare the given checksums. | ||
|
||
Raise an error if the given checksums are not equal. | ||
|
||
:type original: :class:`~google.cloud.spanner_dbapi.checksum.ResultsChecksum` | ||
:param original: results checksum of the original transaction. | ||
|
||
:type retried: :class:`~google.cloud.spanner_dbapi.checksum.ResultsChecksum` | ||
:param retried: results checksum of the retried transaction. | ||
|
||
:raises: :exc:`google.cloud.spanner_dbapi.exceptions.RetryAborted` in case if checksums are not equal. | ||
""" | ||
if retried != original: | ||
raise RetryAborted( | ||
"The transaction was aborted and could not be retried due to a concurrent modification." | ||
) |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -6,6 +6,7 @@ | |||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
"""Database cursor for Google Cloud Spanner DB-API.""" | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
from google.api_core.exceptions import Aborted | ||||||||||||||||||||||||||||||||||||
from google.api_core.exceptions import AlreadyExists | ||||||||||||||||||||||||||||||||||||
from google.api_core.exceptions import FailedPrecondition | ||||||||||||||||||||||||||||||||||||
from google.api_core.exceptions import InternalServerError | ||||||||||||||||||||||||||||||||||||
|
@@ -14,7 +15,7 @@ | |||||||||||||||||||||||||||||||||||
from collections import namedtuple | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
from google.cloud import spanner_v1 as spanner | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
from google.cloud.spanner_dbapi.checksum import ResultsChecksum | ||||||||||||||||||||||||||||||||||||
from google.cloud.spanner_dbapi.exceptions import IntegrityError | ||||||||||||||||||||||||||||||||||||
from google.cloud.spanner_dbapi.exceptions import InterfaceError | ||||||||||||||||||||||||||||||||||||
from google.cloud.spanner_dbapi.exceptions import OperationalError | ||||||||||||||||||||||||||||||||||||
|
@@ -26,11 +27,13 @@ | |||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
from google.cloud.spanner_dbapi import parse_utils | ||||||||||||||||||||||||||||||||||||
from google.cloud.spanner_dbapi.parse_utils import get_param_types | ||||||||||||||||||||||||||||||||||||
from google.cloud.spanner_dbapi.parse_utils import sql_pyformat_args_to_spanner | ||||||||||||||||||||||||||||||||||||
from google.cloud.spanner_dbapi.utils import PeekIterator | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
_UNSET_COUNT = -1 | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
ColumnDetails = namedtuple("column_details", ["null_ok", "spanner_type"]) | ||||||||||||||||||||||||||||||||||||
Statement = namedtuple("Statement", "sql, params, param_types, checksum") | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
class Cursor(object): | ||||||||||||||||||||||||||||||||||||
|
@@ -46,6 +49,8 @@ def __init__(self, connection): | |||||||||||||||||||||||||||||||||||
self._row_count = _UNSET_COUNT | ||||||||||||||||||||||||||||||||||||
self.connection = connection | ||||||||||||||||||||||||||||||||||||
self._is_closed = False | ||||||||||||||||||||||||||||||||||||
# the currently running SQL statement results checksum | ||||||||||||||||||||||||||||||||||||
self._checksum = None | ||||||||||||||||||||||||||||||||||||
IlyaFaer marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
# the number of rows to fetch at a time with fetchmany() | ||||||||||||||||||||||||||||||||||||
self.arraysize = 1 | ||||||||||||||||||||||||||||||||||||
|
@@ -158,15 +163,15 @@ def execute(self, sql, args=None): | |||||||||||||||||||||||||||||||||||
self.connection.run_prior_DDL_statements() | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
if not self.connection.autocommit: | ||||||||||||||||||||||||||||||||||||
transaction = self.connection.transaction_checkout() | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
sql, params = parse_utils.sql_pyformat_args_to_spanner( | ||||||||||||||||||||||||||||||||||||
sql, args | ||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||
sql, params = sql_pyformat_args_to_spanner(sql, args) | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
self._result_set = transaction.execute_sql( | ||||||||||||||||||||||||||||||||||||
sql, params, param_types=get_param_types(params) | ||||||||||||||||||||||||||||||||||||
statement = Statement( | ||||||||||||||||||||||||||||||||||||
sql, params, get_param_types(params), ResultsChecksum(), | ||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||
( | ||||||||||||||||||||||||||||||||||||
self._result_set, | ||||||||||||||||||||||||||||||||||||
self._checksum, | ||||||||||||||||||||||||||||||||||||
) = self.connection.run_statement(statement) | ||||||||||||||||||||||||||||||||||||
self._itr = PeekIterator(self._result_set) | ||||||||||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
|
@@ -207,9 +212,31 @@ def fetchone(self): | |||||||||||||||||||||||||||||||||||
self._raise_if_closed() | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||
return next(self) | ||||||||||||||||||||||||||||||||||||
res = next(self) | ||||||||||||||||||||||||||||||||||||
IlyaFaer marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||
self._checksum.consume_result(res) | ||||||||||||||||||||||||||||||||||||
IlyaFaer marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||
return res | ||||||||||||||||||||||||||||||||||||
except StopIteration: | ||||||||||||||||||||||||||||||||||||
return None | ||||||||||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||||||||||
except Aborted: | ||||||||||||||||||||||||||||||||||||
self.connection.retry_transaction() | ||||||||||||||||||||||||||||||||||||
return self.fetchone() | ||||||||||||||||||||||||||||||||||||
IlyaFaer marked this conversation as resolved.
Show resolved
Hide resolved
IlyaFaer marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this will be a problem. Assuming that this is using the Assume the following situation:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems to me we can just drop the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for reopening this, and this comment should not be considered blocking for merging this PR, but I think we need to look into this once more. Only dropping the
The JDBC driver client solves the above problem by wrapping all streaming iterators before returning these to the client application. That makes it possible for the JDBC driver to replace the underlying streaming iterator with a new one when a transaction has been aborted and successfully retried. We should add that to the Python DBApi as well, but we could do that in a separate PR to prevent this PR from becoming even bigger than it already is. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @c24t, @olavloite, hm-m. I think we're protected from errors here, because our connection API doesn't actually give streaming result objects to a user. Here is where we're getting a streaming iterator: python-spanner-django/google/cloud/spanner_dbapi/cursor.py Lines 167 to 170 in 196c449
So, iterator is held in the protected property python-spanner-django/google/cloud/spanner_dbapi/cursor.py Lines 204 to 212 in 196c449
Where python-spanner-django/google/cloud/spanner_dbapi/cursor.py Lines 293 to 296 in 196c449
Thus, if a transaction failed, the connection will drop the transaction, checkout a new one, re-run all the statements, each of which will replace |
||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
def fetchall(self): | ||||||||||||||||||||||||||||||||||||
"""Fetch all (remaining) rows of a query result, returning them as | ||||||||||||||||||||||||||||||||||||
a sequence of sequences. | ||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||
self._raise_if_closed() | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
res = [] | ||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||
for row in self: | ||||||||||||||||||||||||||||||||||||
self._checksum.consume_result(row) | ||||||||||||||||||||||||||||||||||||
res.append(row) | ||||||||||||||||||||||||||||||||||||
except Aborted: | ||||||||||||||||||||||||||||||||||||
self._connection.retry_transaction() | ||||||||||||||||||||||||||||||||||||
return self.fetchall() | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
return res | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
def fetchmany(self, size=None): | ||||||||||||||||||||||||||||||||||||
"""Fetch the next set of rows of a query result, returning a sequence | ||||||||||||||||||||||||||||||||||||
|
@@ -230,20 +257,17 @@ def fetchmany(self, size=None): | |||||||||||||||||||||||||||||||||||
items = [] | ||||||||||||||||||||||||||||||||||||
for i in range(size): | ||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||
items.append(tuple(self.__next__())) | ||||||||||||||||||||||||||||||||||||
IlyaFaer marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||
res = next(self) | ||||||||||||||||||||||||||||||||||||
IlyaFaer marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||
self._checksum.consume_result(res) | ||||||||||||||||||||||||||||||||||||
items.append(res) | ||||||||||||||||||||||||||||||||||||
except StopIteration: | ||||||||||||||||||||||||||||||||||||
break | ||||||||||||||||||||||||||||||||||||
except Aborted: | ||||||||||||||||||||||||||||||||||||
self._connection.retry_transaction() | ||||||||||||||||||||||||||||||||||||
return self.fetchmany(size) | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
return items | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
def fetchall(self): | ||||||||||||||||||||||||||||||||||||
"""Fetch all (remaining) rows of a query result, returning them as | ||||||||||||||||||||||||||||||||||||
a sequence of sequences. | ||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||
self._raise_if_closed() | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
return list(self.__iter__()) | ||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||
def nextset(self): | ||||||||||||||||||||||||||||||||||||
"""A no-op, raising an error if the cursor or connection is closed.""" | ||||||||||||||||||||||||||||||||||||
self._raise_if_closed() | ||||||||||||||||||||||||||||||||||||
|
Uh oh!
There was an error while loading. Please reload this page.