@@ -62,15 +62,23 @@ class Batch(base.Batch):
6262 settings (~.pubsub_v1.types.BatchSettings): The settings for batch
6363 publishing. These should be considered immutable once the batch
6464 has been opened.
65- autocommit (bool): Whether to autocommit the batch when the time
66- has elapsed. Defaults to True unless ``settings.max_latency`` is
67- inf.
65+ batch_done_callback (Callable[[bool], Any]): Callback called when the
66+ response for a batch publish has been received. Called with one
67+ boolean argument: successfully published or a permanent error
68+ occurred. Temporary errors are not surfaced because they are retried
69+ at a lower level.
70+ commit_when_full (bool): Whether to commit the batch when the batch
71+ is full.
6872 """
6973
70- def __init__ (self , client , topic , settings , autocommit = True ):
74+ def __init__ (
75+ self , client , topic , settings , batch_done_callback = None , commit_when_full = True
76+ ):
7177 self ._client = client
7278 self ._topic = topic
7379 self ._settings = settings
80+ self ._batch_done_callback = batch_done_callback
81+ self ._commit_when_full = commit_when_full
7482
7583 self ._state_lock = threading .Lock ()
7684 # These members are all communicated between threads; ensure that
@@ -87,15 +95,6 @@ def __init__(self, client, topic, settings, autocommit=True):
8795 self ._base_request_size = types .PublishRequest (topic = topic ).ByteSize ()
8896 self ._size = self ._base_request_size
8997
90- # If max latency is specified, start a thread to monitor the batch and
91- # commit when the max latency is reached.
92- self ._thread = None
93- if autocommit and self .settings .max_latency < float ("inf" ):
94- self ._thread = threading .Thread (
95- name = "Thread-MonitorBatchPublisher" , target = self .monitor
96- )
97- self ._thread .start ()
98-
9998 @staticmethod
10099 def make_lock ():
101100 """Return a threading lock.
@@ -148,6 +147,27 @@ def status(self):
148147 """
149148 return self ._status
150149
150+ def cancel (self , cancellation_reason ):
151+ """Complete pending futures with an exception.
152+
153+ This method must be called before publishing starts (ie: while the
154+ batch is still accepting messages.)
155+
156+ Args:
157+ cancellation_reason (BatchCancellationReason): The reason why this
158+ batch has been cancelled.
159+ """
160+
161+ with self ._state_lock :
162+ assert (
163+ self ._status == base .BatchStatus .ACCEPTING_MESSAGES
164+ ), "Cancel should not be called after sending has started."
165+
166+ exc = RuntimeError (cancellation_reason .value )
167+ for future in self ._futures :
168+ future .set_exception (exc )
169+ self ._status = base .BatchStatus .ERROR
170+
151171 def commit (self ):
152172 """Actually publish all of the messages on the active batch.
153173
@@ -162,6 +182,7 @@ def commit(self):
162182 If the current batch is **not** accepting messages, this method
163183 does nothing.
164184 """
185+
165186 # Set the status to "starting" synchronously, to ensure that
166187 # this batch will necessarily not accept new messages.
167188 with self ._state_lock :
@@ -170,7 +191,11 @@ def commit(self):
170191 else :
171192 return
172193
173- # Start a new thread to actually handle the commit.
194+ self ._start_commit_thread ()
195+
196+ def _start_commit_thread (self ):
197+ """Start a new thread to actually handle the commit."""
198+
174199 commit_thread = threading .Thread (
175200 name = "Thread-CommitBatchPublisher" , target = self ._commit
176201 )
@@ -195,7 +220,10 @@ def _commit(self):
195220 # If, in the intervening period between when this method was
196221 # called and now, the batch started to be committed, or
197222 # completed a commit, then no-op at this point.
198- _LOGGER .debug ("Batch is already in progress, exiting commit" )
223+ _LOGGER .debug (
224+ "Batch is already in progress or has been cancelled, "
225+ "exiting commit"
226+ )
199227 return
200228
201229 # Once in the IN_PROGRESS state, no other thread can publish additional
@@ -215,16 +243,24 @@ def _commit(self):
215243 # Log how long the underlying request takes.
216244 start = time .time ()
217245
246+ batch_transport_succeeded = True
218247 try :
248+ # Performs retries for errors defined in retry_codes.publish in the
249+ # publisher_client_config.py file.
219250 response = self ._client .api .publish (self ._topic , self ._messages )
220251 except google .api_core .exceptions .GoogleAPIError as exc :
221- # We failed to publish, set the exception on all futures and
222- # exit.
252+ # We failed to publish, even after retries, so set the exception on
253+ # all futures and exit.
223254 self ._status = base .BatchStatus .ERROR
224255
225256 for future in self ._futures :
226257 future .set_exception (exc )
227258
259+ batch_transport_succeeded = False
260+ if self ._batch_done_callback is not None :
261+ # Failed to publish batch.
262+ self ._batch_done_callback (batch_transport_succeeded )
263+
228264 _LOGGER .exception ("Failed to publish %s messages." , len (self ._futures ))
229265 return
230266
@@ -250,26 +286,17 @@ def _commit(self):
250286 for future in self ._futures :
251287 future .set_exception (exception )
252288
289+ # Unknown error -> batch failed to be correctly transported/
290+ batch_transport_succeeded = False
291+
253292 _LOGGER .error (
254293 "Only %s of %s messages were published." ,
255294 len (response .message_ids ),
256295 len (self ._futures ),
257296 )
258297
259- def monitor (self ):
260- """Commit this batch after sufficient time has elapsed.
261-
262- This simply sleeps for ``self.settings.max_latency`` seconds,
263- and then calls commit unless the batch has already been committed.
264- """
265- # NOTE: This blocks; it is up to the calling code to call it
266- # in a separate thread.
267-
268- # Sleep for however long we should be waiting.
269- time .sleep (self .settings .max_latency )
270-
271- _LOGGER .debug ("Monitor is waking up" )
272- return self ._commit ()
298+ if self ._batch_done_callback is not None :
299+ self ._batch_done_callback (batch_transport_succeeded )
273300
274301 def publish (self , message ):
275302 """Publish a single message.
@@ -294,13 +321,18 @@ def publish(self, message):
294321 pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing
295322 the ``message`` would exceed the max size limit on the backend.
296323 """
324+
297325 # Coerce the type, just in case.
298326 if not isinstance (message , types .PubsubMessage ):
299327 message = types .PubsubMessage (** message )
300328
301329 future = None
302330
303331 with self ._state_lock :
332+ assert (
333+ self ._status != base .BatchStatus .ERROR
334+ ), "Publish after stop() or publish error."
335+
304336 if not self .will_accept (message ):
305337 return future
306338
@@ -333,7 +365,7 @@ def publish(self, message):
4A96
333 365
334366 # Try to commit, but it must be **without** the lock held, since
335367 # ``commit()`` will try to obtain the lock.
336- if overflow :
368+ if self . _commit_when_full and overflow :
337369 self .commit ()
338370
339371 return future
0 commit comments