10000 feat: add new_transaction support (#499) · googleapis/python-datastore@43855dd · GitHub
[go: up one dir, main page]

Skip to content

Commit 43855dd

Browse files
feat: add new_transaction support (#499)
1 parent f4f3bc7 commit 43855dd

12 files changed

+580
-59
lines changed

google/cloud/datastore/aggregation.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -442,13 +442,11 @@ def _next_page(self):
442442
return None
443443

444444
query_pb = self._build_protobuf()
445-
transaction = self.client.current_transaction
446-
if transaction is None:
447-
transaction_id = None
448-
else:
449-
transaction_id = transaction.id
445+
transaction_id, new_transaction_options = helpers.get_transaction_options(
446+
self.client.current_transaction
447+
)
450448
read_options = helpers.get_read_options(
451-
self._eventual, transaction_id, self._read_time
449+
self._eventual, transaction_id, self._read_time, new_transaction_options
452450
)
453451

454452
partition_id = entity_pb2.PartitionId(

google/cloud/datastore/batch.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,19 @@ def mutations(self):
192192
"""
193193
return self._mutations
194194

195+
def _allow_mutations(self) -> bool:
196+
"""
197+
This method is called to see if the batch is in a proper state to allow
198+
`put` and `delete` operations.
199+
200+
the Transaction subclass overrides this method to support
201+
the `begin_later` flag.
202+
203+
:rtype: bool
204+
:returns: True if the batch is in a state to allow mutations.
205+
"""
206+
return self._status == self._IN_PROGRESS
207+
195208
def put(self, entity):
196209
"""Remember an entity's state to be saved during :meth:`commit`.
197210
@@ -218,7 +231,7 @@ def put(self, entity):
218231
progress, if entity has no key assigned, or if the key's
219232
``project`` does not match ours.
220233
"""
221-
if self._status != self._IN_PROGRESS:
234+
if not self._allow_mutations():
222235
raise ValueError("Batch must be in progress to put()")
223236

224237
if entity.key is None:
@@ -248,7 +261,7 @@ def delete(self, key):
248261
progress, if key is not complete, or if the key's
249262
``project`` does not match ours.
250263
"""
251-
if self._status != self._IN_PROGRESS:
264+
if not self._allow_mutations():
252265
raise ValueError("Batch must be in progress to delete()")
253266

254267
if key.is_partial:
@@ -370,10 +383,12 @@ def __enter__(self):
370383

371384
def __exit__(self, exc_type, exc_val, exc_tb):
372385
try:
373-
if exc_type is None:
374-
self.commit()
375-
else:
376-
self.rollback()
386+
# commit or rollback if not in terminal state
387+
if self._status not in (self._ABORTED, self._FINISHED):
388+
if exc_type is None:
389+
self.commit()
390+
else:
391+
self.rollback()
377392
finally:
378393
self._client._pop_batch()
379394

google/cloud/datastore/client.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def _extended_lookup(
122122
missing=None,
123123
deferred=None,
124124
eventual=False,
125-
transaction_id=None,
125+
transaction=None,
126126
retry=None,
127127
timeout=None,
128128
read_time=None,
@@ -158,10 +158,10 @@ def _extended_lookup(
158158
consistency. If True, request ``EVENTUAL`` read
159159
consistency.
160160
161-
:type transaction_id: str
162-
:param transaction_id: If passed, make the request in the scope of
163-
the given transaction. Incompatible with
164-
``eventual==True`` or ``read_time``.
161+
:type transaction: Transaction
162+
:param transaction: If passed, make the request in the scope of
163+
the given transaction. Incompatible with
164+
``eventual==True`` or ``read_time``.
165165
166166
:type retry: :class:`google.api_core.retry.Retry`
167167
:param retry:
@@ -177,7 +177,7 @@ def _extended_lookup(
177177
:type read_time: datetime
178178
:param read_time:
179179
(Optional) Read time to use for read consistency. Incompatible with
180-
``eventual==True`` or ``transaction_id``.
180+
``eventual==True`` or ``transaction``.
181181
This feature is in private preview.
182182
183183
:type database: str
@@ -199,8 +199,14 @@ def _extended_lookup(
199199

200200
results = []
201201

202+
transaction_id = None
203+
transaction_id, new_transaction_options = helpers.get_transaction_options(
204+
transaction
205+
)
206+
read_options = helpers.get_read_options(
207+
eventual, transaction_id, read_time, new_transaction_options
208+
)
202209
loop_num = 0
203-
read_options = helpers.get_read_options(eventual, transaction_id, read_time)
204210
while loop_num < _MAX_LOOPS: # loop against possible deferred.
205211
loop_num += 1
206212
request = {
@@ -214,6 +220,10 @@ def _extended_lookup(
214220
**kwargs,
215221
)
216222

223+
# set new transaction id if we just started a transaction
224+
if transaction and lookup_response.transaction:
225+
transaction._begin_with_id(lookup_response.transaction)
226+
217227
# Accumulate the new results.
218228
results.extend(result.entity for result in lookup_response.found)
219229

@@ -570,7 +580,7 @@ def get_multi(
570580
eventual=eventual,
571581
missing=missing,
572582
deferred=deferred,
573-
transaction_id=transaction and transaction.id,
583+
transaction=transaction,
574584
retry=retry,
575585
timeout=timeout,
576586
read_time=read_time,

google/cloud/datastore/helpers.py

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,9 @@ def entity_to_protobuf(entity):
230230
return entity_pb
231231

232232

233-
def get_read_options(eventual, transaction_id, read_time=None):
233+
def get_read_options(
234+
eventual, transaction_id, read_time=None, new_transaction_options=None
235+
):
234236
"""Validate rules for read options, and assign to the request.
235237
236238
Helper method for ``lookup()`` and ``run_query``.
@@ -245,33 +247,55 @@ def get_read_options(eventual, transaction_id, read_time=None):
245247
:type read_time: datetime
246248
:param read_time: Read data from the specified time (may be null). This feature is in private preview.
247249
250+
:type new_transaction_options: :class:`google.cloud.datastore_v1.types.TransactionOptions`
251+
:param new_transaction_options: Options for a new transaction.
252+
248253
:rtype: :class:`.datastore_pb2.ReadOptions`
249254
:returns: The read options corresponding to the inputs.
250255
:raises: :class:`ValueError` if more than one of ``eventual==True``,
251-
``transaction``, and ``read_time`` is specified.
256+
``transaction_id``, ``read_time``, and ``new_transaction_options`` is specified.
252257
"""
253-
if transaction_id is None:
254-
if eventual:
255-
if read_time is not None:
256-
raise ValueError("eventual must be False when read_time is specified")
257-
else:
258-
return datastore_pb2.ReadOptions(
259-
read_consistency=datastore_pb2.ReadOptions.ReadConsistency.EVENTUAL
260-
)
261-
else:
262-
if read_time is None:
263-
return datastore_pb2.ReadOptions()
264-
else:
265-
read_time_pb = timestamp_pb2.Timestamp()
266-
read_time_pb.FromDatetime(read_time)
267-
return datastore_pb2.ReadOptions(read_time=read_time_pb)
268-
else:
269-
if eventual:
270-
raise ValueError("eventual must be False when in a transaction")
271-
elif read_time is not None:
272-
raise ValueError("transaction and read_time are mutual exclusive")
273-
else:
274-
return datastore_pb2.ReadOptions(transaction=transaction_id)
258+
is_set = [
259+
bool(x) for x in (eventual, transaction_id, read_time, new_transaction_options)
260+
]
261+
if sum(is_set) > 1:
262< 2851 /td>+
raise ValueError(
263+
"At most one of eventual, transaction, or read_time is allowed."
264+
)
265+
new_options = datastore_pb2.ReadOptions()
266+
if transaction_id is not None:
267+
new_options.transaction = transaction_id
268+
if read_time is not None:
269+
read_time_pb = timestamp_pb2.Timestamp()
270+
read_time_pb.FromDatetime(read_time)
271+
new_options.read_time = read_time_pb
272+
if new_transaction_options is not None:
273+
new_options.new_transaction = new_transaction_options
274+
if eventual:
275+
new_options.read_consistency = (
276+
datastore_pb2.ReadOptions.ReadConsistency.EVENTUAL
277+
)
278+
return new_options
279+
280+
281+
def get_transaction_options(transaction):
282+
"""
283+
Get the transaction_id or new_transaction_options field from an active transaction object,
284+
for use in get_read_options
285+
286+
These are mutually-exclusive fields, so one or both will be None.
287+
288+
:rtype: Tuple[Optional[bytes], Optional[google.cloud.datastore_v1.types.TransactionOptions]]
289+
:returns: The transaction_id and new_transaction_options fields from the transaction object.
290+
"""
291+
transaction_id, new_transaction_options = None, None
292+
if transaction is not None:
293+
if transaction.id is not None:
294+
transaction_id = transaction.id
295+
elif transaction._begin_later and transaction._status == transaction._INITIAL:
296+
# If the transaction has not yet been begun, we can use the new_transaction_options field.
297+
new_transaction_options = transaction._options
298+
return transaction_id, new_transaction_options
275299

276300

277301
def key_from_protobuf(pb):

google/cloud/datastore/query.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -778,13 +778,12 @@ def _next_page(self):
778778
return None
779779

780780
query_pb = self._build_protobuf()
781-
transaction = self.client.current_transaction
782-
if transaction is None:
783-
transaction_id = None
784-
else:
785-
transaction_id = transaction.id
781+
new_transaction_options = None
782+
transaction_id, new_transaction_options = helpers.get_transaction_options(
783+
self.client.current_transaction
784+
)
786785
read_options = helpers.get_read_options(
787-
self._eventual, transaction_id, self._read_time
786+
self._eventual, transaction_id, self._read_time, new_transaction_options
788787
)
789788

790789
partition_id = entity_pb2.PartitionId(

google/cloud/datastore/transaction.py

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
# limitations under the License.
1414

1515
"""Create / interact with Google Cloud Datastore transactions."""
16-
1716
from google.cloud.datastore.batch import Batch
1817
from google.cloud.datastore_v1.types import TransactionOptions
1918
from google.protobuf import timestamp_pb2
@@ -149,15 +148,23 @@ class Transaction(Batch):
149148
:param read_time: (Optional) Time at which the transaction reads entities.
150149
Only allowed when ``read_only=True``. This feature is in private preview.
151150
151+
:type begin_later: bool
152+
:param begin_later: (Optional) If True, the transaction will be started
153+
lazily (i.e. when the first RPC is made). If False,
154+
the transaction will be started as soon as the context manager
155+
is entered. `self.begin()` can also be called manually to begin
156+
the transaction at any time. Default is False.
157+
152158
:raises: :class:`ValueError` if read_time is specified when
153159
``read_only=False``.
154160
"""
155161

156162
_status = None
157163

158-
def __init__ 10000 (self, client, read_only=False, read_time=None):
164+
def __init__(self, client, read_only=False, read_time=None, begin_later=False):
159165
super(Transaction, self).__init__(client)
160166
self._id = None
167+
self._begin_later = begin_later
161168

162169
if read_only:
163170
if read_time is not None:
@@ -180,8 +187,8 @@ def __init__(self, client, read_only=False, read_time=None):
180187
def id(self):
181188
"""Getter for the transaction ID.
182189
183-
:rtype: str
184-
:returns: The ID of the current transaction.
190+
:rtype: bytes or None
191+
:returns: The ID of the current transaction, or None if not started.
185192
"""
186193
return self._id
187194

@@ -240,6 +247,21 @@ def begin(self, retry=None, timeout=None):
240247
self._status = self._ABORTED
241248
raise
242249

250+
def _begin_with_id(self, transaction_id):
251+
"""
252+
Attach newly created transaction to an existing transaction ID.
253+
254+
This is used when begin_later is True, when the first lookup request
255+
associated with this transaction creates a new transaction ID.
256+
257+
:type transaction_id: bytes
258+
:param transaction_id: ID of the transaction to attach to.
259+
"""
260+
if self._status is not self._INITIAL:
261+
raise ValueError("Transaction already begun.")
262+
self._id = transaction_id
263+
self._status = self._IN_PROGRESS
264+
243265
def rollback(self, retry=None, timeout=None):
244266
"""Rolls back the current transaction.
245267
@@ -258,6 +280,12 @@ def rollback(self, retry=None, timeout=None):
258280
Note that if ``retry`` is specified, the timeout applies
259281
to each individual attempt.
260282
"""
283+
# if transaction has not started, abort it
284+
if self._status == self._INITIAL:
285+
self._status = self._ABORTED
286+
self._id = None
287+
return None
288+
261289
kwargs = _make_retry_timeout_kwargs(retry, timeout)
262290

263291
try:
@@ -296,6 +324,15 @@ def commit(self, retry=None, timeout=None):
296324
Note that if ``retry`` is specified, the timeout applies
297325
to each individual attempt.
298326
"""
327+
# if transaction has not begun, either begin now, or abort if empty
328+
if self._status == self._INITIAL:
329+
if not self._mutations:
330+
self._status = self._ABORTED
331+
self._id = None
332+
return None
333+
else:
334+
self.begin()
335+
299336
kwargs = _make_retry_timeout_kwargs(retry, timeout)
300337

301338
try:
@@ -321,3 +358,18 @@ def put(self, entity):
321358
raise RuntimeError("Transaction is read only")
322359
else:
323360
super(Transaction, self).put(entity)
361+
362+
def __enter__(self):
363+
if not self._begin_later:
364+
self.begin()
365+
self._client._push_batch(self)
366+
return self
367+
368+
def _allow_mutations(self):
369+
"""
370+
Mutations can be added to a transaction if it is in IN_PROGRESS state,
371+
or if it is in INITIAL state and the begin_later flag is set.
372+
"""
373+
return self._status == self._IN_PROGRESS or (
374+
self._begin_later and self._status == self._INITIAL
375+
)

0 commit comments

Comments
 (0)
0