8000 fix(celery): flush queue on worker exit (#295) · etherscan-io/sentry-python@085128e · GitHub
[go: up one dir, main page]

Skip to content

Commit 085128e

Browse files
authored
fix(celery): flush queue on worker exit (getsentry#295)
* fix(celery): flush queue on worker exit Fix getsentry#285 * fix: Fix tests * fix: Linters
1 parent c96119f commit 085128e

File tree

4 files changed

+134
-51
lines changed

4 files changed

+134
-51
lines changed

sentry_sdk/integrations/celery.py

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@ def setup_once():
2323
def sentry_build_tracer(name, task, *args, **kwargs):
2424
# Need to patch both methods because older celery sometimes
2525
# short-circuits to task.run if it thinks it's safe.
26-
task.__call__ = _wrap_task_call(task.__call__)
27-
task.run = _wrap_task_call(task.run)
26+
task.__call__ = _wrap_task_call(task, task.__call__)
27+
task.run = _wrap_task_call(task, task.run)
2828
return _wrap_tracer(task, old_build_tracer(name, task, *args, **kwargs))
2929

3030
trace.build_tracer = sentry_build_tracer
3131

32+
_patch_worker_exit()
33+
3234
# This logger logs every status of every task that ran on the worker.
3335
# Meaning that every task's breadcrumbs are full of stuff like "Task
3436
# <foo> raised unexpected <bar>".
@@ -57,14 +59,17 @@ def _inner(*args, **kwargs):
5759
return _inner
5860

5961

60-
def _wrap_task_call(f):
62+
def _wrap_task_call(task, f):
6163
# Need to wrap task call because the exception is caught before we get to
6264
# see it. Also celery's reported stacktrace is untrustworthy.
6365
def _inner(*args, **kwargs):
6466
try:
6567
return f(*args, **kwargs)
6668
except Exception:
67-
reraise(*_capture_exception())
69+
exc_info = sys.exc_info()
70+
with capture_internal_exceptions():
71+
_capture_exception(task, exc_info)
72+
reraise(*exc_info)
6873

6974
return _inner
7075

@@ -83,15 +88,6 @@ def event_processor(event, hint):
8388
}
8489

8590
if "exc_info" in hint:
86-
with capture_internal_exceptions():
87-
if isinstance(hint["exc_info"][1], Retry):
88-
return None
89-
90-
if hasattr(task, "throws") and isinstance(
91-
hint["exc_info"][1], task.throws
92-
):
93-
return None
94-
9591
with capture_internal_exceptions():
9692
if issubclass(hint["exc_info"][0], SoftTimeLimitExceeded):
9793
event["fingerprint"] = [
@@ -105,16 +101,39 @@ def event_processor(event, hint):
105101
return event_processor
106102

107103

108-
def _capture_exception():
104+
def _capture_exception(task, exc_info):
109105
hub = Hub.current
110-
exc_info = sys.exc_info()
111106

112-
if hub.get_integration(CeleryIntegration) is not None:
113-
event, hint = event_from_exception(
114-
exc_info,
115-
client_options=hub.client.options,
116-
mechanism={"type": "celery", "handled": False},
117-
)
118-
hub.capture_event(event, hint=hint)
107+
if hub.get_integration(CeleryIntegration) is None:
108+
return
109+
if isinstance(exc_info[1], Retry):
110+
return
111+
if hasattr(task, "throws") and isinstance(exc_info[1], task.throws):
112+
return
113+
114+
event, hint = event_from_exception(
115+
exc_info,
116+
client_options=hub.client.options,
117+
mechanism={"type": "celery", "handled": False},
118+
)
119+
120+
hub.capture_event(event, hint=hint)
121+
122+
123+
def _patch_worker_exit():
124+
# Need to flush queue before worker shutdown because a crashing worker will
125+
# call os._exit
126+
from billiard.pool import Worker # type: ignore
127+
128+
old_workloop = Worker.workloop
129+
130+
def sentry_workloop(*args, **kwargs):
131+
try:
132+
return old_workloop(*args, **kwargs)
133+
finally:
134+
with capture_internal_exceptions():
135+
hub = Hub.current
136+
if hub.get_integration(CeleryIntegration) is not None:
137+
hub.flush()
119138

120-
return exc_info
139+
Worker.workloop = sentry_workloop

tests/conftest.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,41 @@ def append(event):
122122
return events
123123

124124
return inner
125+
126+
127+
@pytest.fixture
128+
def capture_events_forksafe(monkeypatch):
129+
def inner():
130+
events_r, events_w = os.pipe()
131+
events_r = os.fdopen(events_r, "rb", 0)
132+
events_w = os.fdopen(events_w, "wb", 0)
133+
134+
test_client = sentry_sdk.Hub.current.client
135+
136+
old_capture_event = test_client.transport.capture_event
137+
138+
def append(event):
139+
events_w.write(json.dumps(event).encode("utf-8"))
140+
events_w.write(b"\n")
141+
return old_capture_event(event)
142+
143+
def flush(timeout=None, callback=None):
144+
events_w.write(b"flush\n")
145+
146< 10000 span class="diff-text-marker">+
monkeypatch.setattr(test_client.transport, "capture_event", append)
147+
monkeypatch.setattr(test_client, "flush", flush)
148+
149+
return EventStreamReader(events_r)
150+
151+
return inner
152+
153+
154+
class EventStreamReader(object):
155+
def __init__(self, file):
156+
self.file = file
157+
158+
def read_event(self):
159+
return json.loads(self.file.readline().decode("utf-8"))
160+
161+
def read_flush(self):
162+
assert self.file.readline() == b"flush\n"

tests/integrations/celery/test_celery.py

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import threading
2+
13
import pytest
24

35
pytest.importorskip("celery")
@@ -6,6 +8,7 @@
68
from sentry_sdk.integrations.celery import CeleryIntegration
79

810
from celery import Celery, VERSION
11+
from celery.bin import worker
912

1013

1114
@pytest.fixture
@@ -22,7 +25,10 @@ def init_celery(sentry_init):
2225
def inner():
2326
sentry_init(integrations=[CeleryIntegration()])
2427
celery = Celery(__name__)
25-
celery.conf.CELERY_ALWAYS_EAGER = True
28+
if VERSION < (4,):
29+
celery.conf.CELERY_ALWAYS_EAGER = True
30+
else:
31+
celery.conf.task_always_eager = True
2632
return celery
2733

2834
return inner
@@ -92,11 +98,11 @@ def dummy_task(x, y):
9298
stack_lengths.append(len(Hub.current._stack))
9399
return x / y
94100

95-
try:
101+
if VERSION >= (4,):
96102
dummy_task.delay(2, 2)
97-
except ZeroDivisionError:
98-
if VERSION >= (4,):
99-
raise
103+
else:
104+
with pytest.raises(ZeroDivisionError):
105+
dummy_task.delay(2, 2)
100106

101107
assert len(Hub.current._stack) == 1
102108
if VERSION < (4,):
@@ -139,3 +145,41 @@ def dummy_task(self):
139145

140146
for e in exceptions:
141147
assert e["type"] == "ZeroDivisionError"
148+
149+
150+
@pytest.mark.skipif(VERSION < (4,), reason="in-memory backend broken")
151+
def test_transport_shutdown(request, celery, capture_events_forksafe, tmpdir):
152+
events = capture_events_forksafe()
153+
154+
celery.conf.worker_max_tasks_per_child = 1
155+
celery.conf.broker_url = "memory://localhost/"
156+
celery.conf.broker_backend = "memory"
157+
celery.conf.result_backend = "file://{}".format(tmpdir.mkdir("celery-results"))
158+
celery.conf.task_always_eager = False
159+
160+
runs = []
161+
162+
@celery.task(name="dummy_task", bind=True)
163+
def dummy_task(self):
164+
runs.append(1)
165+
1 / 0
166+
167+
res = dummy_task.delay()
168+
169+
w = worker.worker(app=celery)
170+
t = threading.Thread(target=w.run)
171+
t.daemon = True
172+
t.start()
173+
174+
with pytest.raises(Exception):
175+
# Celery 4.1 raises a gibberish exception
176+
res.wait()
177+
178+
event = events.read_event()
179+
exception, = event["exception"]["values"]
180+
assert exception["type"] == "ZeroDivisionError"
181+
182+
events.read_flush()
183+
184+
# if this is nonempty, the worker never really forked
185+
assert not runs

tests/integrations/rq/test_rq.py

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
11
from sentry_sdk.integrations.rq import RqIntegration
22

3-
import os
4-
import json
5-
63
from fakeredis import FakeStrictRedis
74
import rq
85

9-
from sentry_sdk import Hub
10-
116

127
def crashing_job(foo):
138
1 / 0
@@ -40,32 +35,19 @@ def test_basic(sentry_init, capture_events):
4035
}
4136

4237

43-
def test_transport_shutdown(sentry_init):
38+
def test_transport_shutdown(sentry_init, capture_events_forksafe):
4439
sentry_init(integrations=[RqIntegration()])
4540

46-
events_r, events_w = os.pipe()
47-
events_r = os.fdopen(events_r, "rb", 0)
48-
events_w = os.fdopen(events_w, "wb", 0)
49-
50-
def capture_event(event):
51-
events_w.write(json.dumps(event).encode("utf-8"))
52-
events_w.write(b"\n")
53-
54-
def flush(timeout=None, callback=None):
55-
events_w.write(b"flush\n")
56-
57-
Hub.current.client.transport.capture_event = capture_event
58-
Hub.current.client.flush = flush
41+
events = capture_events_forksafe()
5942

6043
queue = rq.Queue(connection=FakeStrictRedis())
6144
worker = rq.Worker([queue], connection=queue.connection)
6245

6346
queue.enqueue(crashing_job, foo=42)
6447
worker.work(burst=True)
6548

66-
event = events_r.readline()
67-
event = json.loads(event.decode("utf-8"))
49+
event = events.read_event()
50+
events.read_flush()
51+
6852
exception, = event["exception"]["values"]
6953
assert exception["type"] == "ZeroDivisionError"
70-
71-
assert events_r.readline() == b"flush\n"

0 commit comments

Comments
 (0)
0