8000 feat: New function Client.flush (#250) · etherscan-io/sentry-python@1c41dfb · GitHub
[go: up one dir, main page]

Skip to content

Commit 1c41dfb

Browse files
authored
feat: New function Client.flush (getsentry#250)
* feat: New function Client.flush * fix: Fix tests * fix: Fix rq tests * fix: Fix AWS tests * fix: Flush behavior * fix: Remove unused import * Revert "fix: Flush behavior" This reverts commit 1ca2546. * Revert "fix: Remove unused import" This reverts commit 783592c.
1 parent 28133ca commit 1c41dfb

File tree

9 files changed

+46
-68
lines changed

9 files changed

+46
-68
lines changed

sentry_sdk/client.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -197,20 +197,29 @@ def capture_event(self, event, hint=None, scope=None):
197197
self.transport.capture_event(event)
198198
return rv
199199

200-
def close(self, timeout=None, shutdown_callback=None):
201-
"""Closes the client which shuts down the transport in an
202-
orderly manner.
203-
204-
The `shutdown_callback` is invoked with two arguments: the number of
205-
pending events and the configured shutdown timeout. For instance the
206-
default atexit integration will use this to render out a message on
207-
stderr.
200+
def close(self, timeout=None, callback=None):
201+
"""
202+
Close the client and shut down the transport. Arguments have the same
203+
semantics as `self.flush()`.
204+
"""
205+
if self.transport is not None:
206+
self.flush(timeout=timeout, callback=callback)
207+
self.transport.kill()
208+
self.transport = None
209+
210+
def flush(self, timeout=None, callback=None):
211+
"""
212+
Wait `timeout` seconds for the current events to be sent. If no
213+
`timeout` is provided, the `shutdown_timeout` option value is used.
214+
215+
The `callback` is invoked with two arguments: the number of pending
216+
events and the configured timeout. For instance the default atexit
217+
integration will use this to render out a message on stderr.
208218
"""
209219
if self.transport is not None:
210220
if timeout is None:
211221
timeout = self.options["shutdown_timeout"]
212-
self.transport.shutdown(timeout=timeout, callback=shutdown_callback)
213-
self.transport = None
222+
self.transport.flush(timeout=timeout, callback=callback)
214223

215224
def __enter__(self):
216225
return self

sentry_sdk/integrations/atexit.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from sentry_sdk.integrations import Integration
1010

1111

12-
def default_shutdown_callback(pending, timeout):
12+
def default_callback(pending, timeout):
1313
"""This is the default shutdown callback that is set on the options.
1414
It prints out a message to stderr that informs the user that some events
1515
are still pending and the process is waiting for them to flush out.
@@ -29,7 +29,7 @@ class AtexitIntegration(Integration):
2929

3030
def __init__(self, callback=None):
3131
if callback is None:
32-
callback = default_shutdown_callback
32+
callback = default_callback
3333
self.callback = callback
3434

3535
@staticmethod
@@ -41,4 +41,4 @@ def _shutdown():
4141
integration = hub.get_integration(AtexitIntegration)
4242
if integration is not None:
4343
logger.debug("atexit: shutting down client")
44-
hub.client.close(shutdown_callback=integration.callback)
44+
hub.client.close(callback=integration.callback)

sentry_sdk/integrations/aws_lambda.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,8 @@ def _drain_queue():
4545
integration = hub.get_integration(AwsLambdaIntegration)
4646
if integration is not None:
4747
# Flush out the event queue before AWS kills the
48-
# process. This is not threadsafe.
49-
# make new transport with empty queue
50-
new_transport = hub.client.transport.copy()
51-
hub.client.close()
52-
hub.client.transport = new_transport
48+
# process.
49+
hub.client.flush()
5350

5451

5552
class AwsLambdaIntegration(Integration):

sentry_sdk/integrations/rq.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,7 @@ def sentry_patched_perform_job(self, job, *args, **kwargs):
3333
# We're inside of a forked process and RQ is
3434
# about to call `os._exit`. Make sure that our
3535
# events get sent out.
36-
#
37-
# Closing the client should not affect other jobs since
38-
# we're in a different process
39-
hub.client.close()
36+
hub.client.flush()
4037

4138
return rv
4239

sentry_sdk/transport.py

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -37,27 +37,14 @@ def capture_event(self, event):
3737
"""
3838
raise NotImplementedError()
3939

40-
def shutdown(self, timeout, callback=None):
41-
"""Initiates a controlled shutdown that should flush out pending
42-
events. The callback must be invoked with the number of pending
43-
events and the timeout if the shutting down would take some period
44-
of time (eg: not instant).
45-
"""
46-
self.kill()
40+
def flush(self, timeout, callback=None):
41+
"""Wait `timeout` seconds for the current events to be sent out."""
42+
pass
4743

4844
def kill(self):
4945
"""Forcefully kills the transport."""
5046
pass
5147

52-
def copy(self):
53-
"""Copy the transport.
54-
55-
The returned transport should behave completely independent from the
56-
previous one. It still may share HTTP connection pools, but not share
57-
any state such as internal queues.
58-
"""
59-
return self
60-
6148
def __del__(self):
6249
try:
6350
self.kill()
@@ -161,22 +148,15 @@ def send_event_wrapper():
161148

162149
self._worker.submit(send_event_wrapper)
163150

164-
def shutdown(self, timeout, callback=None):
165-
logger.debug("Shutting down HTTP transport orderly")
166-
if timeout <= 0:
167-
self._worker.kill()
168-
else:
169-
self._worker.shutdown(timeout, callback)
151+
def flush(self, timeout, callback=None):
152+
logger.debug("Flushing HTTP transport")
153+
if timeout > 0:
154+
self._worker.flush(timeout, callback)
170155

171156
def kill(self):
172157
logger.debug("Killing HTTP transport")
173158
self._worker.kill()
174159

175-
def copy(self):
176-
transport = type(self)(self.options)
177-
transport._pool = self._pool
178-
return transport
179-
180160

181161
class _FunctionTransport(Transport):
182162
def __init__(self, func):

sentry_sdk/worker.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,22 +59,18 @@ def kill(self):
5959
self._thread = None
6060
self._thread_for_pid = None
6161

62-
def shutdown(self, timeout, callback=None):
63-
logger.debug("background worker got shutdown request")
62+
def flush(self, timeout, callback=None):
63+
logger.debug("background worker got flush request")
6464
with self._lock:
65-
if self.is_alive:
66-
self._queue.put_nowait(_TERMINATOR)
67-
if timeout > 0.0:
68-
self._wait_shutdown(timeout, callback)
69-
self._thread = None
70-
self._thread_for_pid = None
71-
logger.debug("background worker shut down")
65+
if self.is_alive and timeout > 0.0:
66+
self._wait_flush(timeout, callback)
67+
logger.debug("background worker flushed")
7268

73-
def _wait_shutdown(self, timeout, callback):
69+
def _wait_flush(self, timeout, callback):
7470
initial_timeout = min(0.1, timeout)
7571
if not self._timed_queue_join(initial_timeout):
7672
pending = self._queue.qsize()
77-
logger.debug("%d event(s) pending on shutdown", pending)
73+
logger.debug("%d event(s) pending on flush", pending)
7874
if callback is not None:
7975
callback(pending, timeout)
8076
self._timed_queue_join(timeout - initial_timeout)

tests/integrations/aws_lambda/test_aws.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def __init__(self):
2727
def capture_event(self, event):
2828
self._queue.append(event)
2929
30-
def shutdown(self, timeout, callback=None):
30+
def flush(self, timeout, callback=None):
3131
# Delay event output like this to test proper shutdown
3232
# Note that AWS Lambda trunchates the log output to 4kb, so you better
3333
# pray that your events are smaller than that or else tests start

tests/integrations/rq/test_rq.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ def capture_event(event):
5151
events_w.write(json.dumps(event).encode("utf-8"))
5252
events_w.write(b"\n")
5353

54-
def shutdown(timeout, callback=None):
55-
events_w.write(b"shutdown\n")
54+
def flush(timeout=None, callback=None):
55+
events_w.write(b"flush\n")
5656

5757
Hub.current.client.transport.capture_event = capture_event
58-
Hub.current.client.transport.shutdown = shutdown
58+
Hub.current.client.flush = flush
5959

6060
queue = rq.Queue(connection=FakeStrictRedis())
6161
worker = rq.Worker([queue], connection=queue.connection)
@@ -64,9 +64,8 @@ def shutdown(timeout, callback=None):
6464
worker.work(burst=True)
6565

6666
event = events_r.readline()
67-
shutdown = events_r.readline()
6867
event = json.loads(event.decode("utf-8"))
69-
assert shutdown == b"shutdown\n"
70-
7168
exception, = event["exception"]["values"]
7269
assert exception["type"] == "ZeroDivisionError"
70+
71+
assert events_r.readline() == b"flush\n"

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ envlist =
3232
{pypy,py2.7,py3.5,py3.6,py3.7,py3.8}-pyramid-{1.3,1.4,1.5,1.6,1.7,1.8,1.9}
3333

3434
{pypy,py2.7,py3.5,py3.6}-rq-{0.6,0.7,0.8,0.9,0.10,0.11}
35-
{pypy,py2.7,py3.5,py3.6,py3.7,py3.8}-rq-0.12
35+
{pypy,py2.7,py3.5,py3.6,py3.7,py3.8}-rq-{0.12,0.13}
3636

3737
py3.7-aiohttp
3838
{py3.7,py3.8}-tornado-{5,6}

0 commit comments

Comments
 (0)
0