10000 fix: Do not use Celery signals for scope pushing/popping · tb-lib/sentry-python@778e207 · GitHub
[go: up one dir, main page]

Skip to content

Commit 778e207

Browse files
committed
fix: Do not use Celery signals for scope pushing/popping
1 parent 3ac9097 commit 778e207

File tree

2 files changed

+84
-32
lines changed

2 files changed

+84
-32
lines changed

sentry_sdk/integrations/celery.py

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
import sys
44

5-
from celery.signals import task_failure, task_prerun, task_postrun
65
from celery.exceptions import SoftTimeLimitExceeded
76

87
from sentry_sdk.hub import Hub
98
from sentry_sdk.utils import capture_internal_exceptions, event_from_exception
9+
from sentry_sdk._compat import reraise
1010
from sentry_sdk.integrations import Integration
1111
from sentry_sdk.integrations.logging import ignore_logger
1212

@@ -16,33 +16,42 @@ class CeleryIntegration(Integration):
1616

1717
@staticmethod
1818
def setup_once():
19-
task_prerun.connect(_handle_task_prerun, weak=False)
20-
task_postrun.connect(_handle_task_postrun, weak=False)
21-
task_failure.connect(_process_failure_signal, weak=False)
19+
import celery.app.trace as trace
20+
21+
old_build_tracer = trace.build_tracer
22+
23+
def sentry_build_tracer(name, task, *args, **kwargs):
24+
task.__call__ = _wrap_task_call(task, task.__call__)
25+
task.run = _wrap_task_call(task, task.run)
26+
return old_build_tracer(name, task, *args, **kwargs)
27+
28+
trace.build_tracer = sentry_build_tracer
2229

2330
# This logger logs every status of every task that ran on the worker.
2431
# Meaning that every task's breadcrumbs are full of stuff like "Task
2532
# <foo> raised unexpected <bar>".
2633
ignore_logger("celery.worker.job")
2734

2835

29-
def _process_failure_signal(sender, task_id, einfo, **kw):
30-
# einfo from celery is not reliable
31-
exc_info = sys.exc_info()
32-
33-
hub = Hub.current
34-
integration = hub.get_integration(CeleryIntegration)
35-
if integration is None:
36-
return
36+
def _wrap_task_call(self, f):
37+
def _inner(*args, **kwargs):
38+
hub = Hub.current
39+
if hub.get_integration(CeleryIntegration) is None:
40+
return f(*args, **kwargs)
3741

38-
_capture_event(hub, exc_info)
42+
with hub.configure_scope() as scope:
43+
if scope._name == "celery":
44+
return f(*args, **kwargs)
3945

46+
with hub.push_scope() as scope:
47+
scope._name = "celery"
48+
scope.add_event_processor(_make_event_processor(args, kwargs, self))
49+
try:
50+
return f(*args, **kwargs)
51+
except Exception:
52+
reraise(*_capture_exception(hub))
4053

41-
def _handle_task_prerun(sender, task, args, kwargs, **_):
42-
hub = Hub.current
43-
if hub.get_integration(CeleryIntegration) is not None:
44-
scope = hub.push_scope().__enter__()
45-
scope.add_event_processor(_make_event_processor(args, kwargs, task))
54+
return _inner
4655

4756

4857
def _make_event_processor(args, kwargs, task):
@@ -78,16 +87,12 @@ def event_processor(event, hint):
7887
return event_processor
7988

8089

81-
def _handle_task_postrun(sender, task_id, task, **kw):
82-
hub = Hub.current
83-
if hub.get_integration(CeleryIntegration) is not None:
84-
hub.pop_scope_unsafe()
85-
86-
87-
def _capture_event(hub, exc_info):
90+
def _capture_exception(hub):
91+
exc_info = sys.exc_info()
8892
event, hint = event_from_exception(
8993
exc_info,
9094
client_options=hub.client.options,
9195
mechanism={"type": "celery", "handled": False},
9296
)
9397
hub.capture_event(event, hint=hint)
98+
return exc_info

tests/integrations/celery/test_celery.py

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,35 @@
22

33
pytest.importorskip("celery")
44

5+
from sentry_sdk import Hub, configure_scope
56
from sentry_sdk.integrations.celery import CeleryIntegration
67

7-
from celery import Celery
8+
from celery import Celery, VERSION
89

910

1011
@pytest.fixture
11-
def celery(sentry_init):
12-
sentry_init(integrations=[CeleryIntegration()])
13-
celery = Celery(__name__)
14-
celery.conf.CELERY_ALWAYS_EAGER = True
15-
return celery
12+
def connect_signal(request):
13+
def inner(signal, f):
14+
signal.connect(f)
15+
request.addfinalizer(lambda: signal.disconnect(f))
16+
17+
return inner
18+
19+
20+
@pytest.fixture
21+
def init_celery(sentry_init):
22+
def inner():
23+
sentry_init(integrations=[CeleryIntegration()])
24+
celery = Celery(__name__)
25+
celery.conf.CELERY_ALWAYS_EAGER = True
26+
return celery
27+
28+
return inner
29+
30+
31+
@pytest.fixture
32+
def celery(init_celery):
33+
return init_celery()
1634

1735

1836
def test_simple(capture_events, celery):
@@ -36,7 +54,7 @@ def dummy_task(x, y):
3654
exception, = event["exception"]["values"]
3755
assert exception["type"] == "ZeroDivisionError"
3856
assert exception["mechanism"]["type"] == "celery"
39-
assert exception["stacktrace"]["frames"][1]["vars"]["foo"] == "42"
57+
assert exception["stacktrace"]["frames"][0]["vars"]["foo"] == "42"
4058

4159

4260
def test_ignore_expected(capture_events, celery):
@@ -49,3 +67,32 @@ def dummy_task(x, y):
4967
dummy_task.delay(1, 2)
5068
dummy_task.delay(1, 0)
5169
assert not events
70+
71+
72+
def test_broken_prerun(capture_events, init_celery, connect_signal):
73+
from celery.signals import task_prerun, task_postrun
74+
75+
def crash(*args, **kwargs):
76+
1 / 0
77+
78+
# Order here is important to reproduce the bug: In Celery 3, a crashing
79+
# prerun would prevent other preruns from running.
80+
81+
connect_signal(task_prerun, crash)
82+
celery = init_celery()
83+
84+
events = capture_events()
85+
assert len(Hub.current._stack) == 1
86+
87+
@celery.task(name="dummy_task")
88+
def dummy_task(x, y):
89+
assert len(Hub.current._stack) == 2
90+
return x / y
91+
92+
try:
93+
result = dummy_task.delay(2, 2)
94+
except ZeroDivisionError:
95+
if VERSION >= (4,):
96+
raise
97+
98+
assert len(Hub.current._stack) == 1

0 commit comments

Comments
 (0)
0