8000 feat: Capture spans for Celery's apply_async (#454) · etherscan-io/sentry-python@b25ddfb · GitHub
[go: up one dir, main page]

Skip to content

Commit b25ddfb

Browse files
dcrameruntitaker
authored andcommitted
feat: Capture spans for Celery's apply_async (getsentry#454)
* feat: Capture spans for Celery's apply_async * test: Add basic test for submission span
1 parent 54a4556 commit b25ddfb

File tree

2 files changed

+83
-20
lines changed

2 files changed

+83
-20
lines changed

sentry_sdk/integrations/celery.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ def apply_async(*args, **kwargs):
7777
headers[key] = value
7878
if headers is not None:
7979
kwargs["headers"] = headers
80-
return f(*args, **kwargs)
80+
81+
with hub.span(op="celery.submit", description=task.name):
82+
return f(*args, **kwargs)
83+
else:
84+
return f(*args, **kwargs)
8185

8286
return apply_async
8387

tests/integrations/celery/test_celery.py

Lines changed: 78 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from sentry_sdk import Hub, configure_scope
88
from sentry_sdk.integrations.celery import CeleryIntegration
9+
from sentry_sdk._compat import text_type
910

1011
from celery import Celery, VERSION
1112
from celery.bin import worker
@@ -22,8 +23,11 @@ def inner(signal, f):
2223

2324
@pytest.fixture
2425
def init_celery(sentry_init):
25-
def inner(propagate_traces=True):
26-
sentry_init(integrations=[CeleryIntegration(propagate_traces=propagate_traces)])
26+
def inner(propagate_traces=True, **kwargs):
27+
sentry_init(
28+
integrations=[CeleryIntegration(propagate_traces=propagate_traces)],
29+
**kwargs
30+
)
2731
celery = Celery(__name__)
2832
if VERSION < (4,):
2933
celery.conf.CELERY_ALWAYS_EAGER = True
@@ -39,22 +43,30 @@ def celery(init_celery):
3943
return init_celery()
4044

4145

42-
@pytest.mark.parametrize(
43-
"invocation,expected_context",
44-
[
45-
[lambda task, x, y: task.delay(x, y), {"args": [1, 0], "kwargs": {}}],
46-
[lambda task, x, y: task.apply_async((x, y)), {"args": [1, 0], "kwargs": {}}],
47-
[
48-
lambda task, x, y: task.apply_async(args=(x, y)),
49-
{"args": [1, 0], "kwargs": {}},
50-
],
51-
[
52-
lambda task, x, y: task.apply_async(kwargs=dict(x=x, y=y)),
53-
{"args": [], "kwargs": {"x": 1, "y": 0}},
54-
],
55-
],
46+
@pytest.fixture(
47+
params=[
48+
lambda task, x, y: (task.delay(x, y), {"args": [x, y], "kwargs": {}}),
49+
lambda task, x, y: (task.apply_async((x, y)), {"args": [x, y], "kwargs": {}}),
50+
lambda task, x, y: (
51+
task.apply_async(args=(x, y)),
52+
{"args": [x, y], "kwargs": {}},
53+
),
54+
lambda task, x, y: (
55+
task.apply_async(kwargs=dict(x=x, y=y)),
56+
{"args": [], "kwargs": {"x": x, "y": y}},
57+
),
58+
]
5659
)
57-
def test_simple(capture_events, celery, invocation, expected_context):
60+
def celery_invocation(request):
61+
"""
62+
Invokes a task in multiple ways Celery allows you to (testing our apply_async monkeypatch).
63+
64+
Currently limited to a task signature of the form foo(x, y)
65+
"""
66+
return request.param
67+
68+
69+
def test_simple(capture_events, celery, celery_invocation):
5870
events = capture_events()
5971

6072
@celery.task(name="dummy_task")
@@ -63,8 +75,8 @@ def dummy_task(x, y):
6375
return x / y
6476

6577
with Hub.current.span() as span:
66-
invocation(dummy_task, 1, 2)
67-
invocation(dummy_task, 1, 0)
78+
celery_invocation(dummy_task, 1, 2)
79+
_, expected_context = celery_invocation(dummy_task, 1, 0)
6880

6981
event, = events
7082

@@ -81,6 +93,53 @@ def dummy_task(x, y):
8193
assert exception["stacktrace"]["frames"][0]["vars"]["foo"] == "42"
8294

8395

96+
@pytest.mark.parametrize("task_fails", [True, False], ids=["error", "success"])
97+
def test_transaction_events(capture_events, init_celery, celery_invocation, task_fails):
98+
celery = init_celery(traces_sample_rate=1.0)
99+
100+
@celery.task(name="dummy_task")
101+
def dummy_task(x, y):
102+
return x / y
103+
104+
# XXX: For some reason the first call does not get instrumented properly.
105+
celery_invocation(dummy_task, 1, 1)
106+
107+
events = capture_events()
108+
109+
with Hub.current.span(transaction="submission") as span:
110+
celery_invocation(dummy_task, 1, 0 if task_fails else 1)
111+
112+
if task_fails:
113+
error_event = events< A3E2 /span>.pop(0)
114+
assert error_event["contexts"]["trace"]["trace_id"] == span.trace_id
115+
assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError"
116+
117+
execution_event, submission_event = events
118+
119+
assert execution_event["transaction"] == "dummy_task"
120+
assert submission_event["transaction"] == "submission"
121+
122+
assert execution_event["type"] == submission_event["type"] == "transaction"
123+
assert execution_event["contexts"]["trace"]["trace_id"] == span.trace_id
124+
assert submission_event["contexts"]["trace"]["trace_id"] == span.trace_id
125+
126+
assert execution_event["spans"] == []
127+
assert submission_event["spans"] == [
128+
{
129+
u"data": {},
130+
u"description": u"dummy_task",
131+
u"op": "celery.submit",
132+
u"parent_span_id": submission_event["contexts"]["trace"]["span_id"],
133+
u"same_process_as_parent": True,
134+
u"span_id": submission_event["spans"][0]["span_id"],
135+
u"start_timestamp": submission_event["spans"][0]["start_timestamp"],
136+
u"tags": {u"error": False},
137+
u"timestamp": submission_event["spans"][0]["timestamp"],
138+
u"trace_id": text_type(span.trace_id),
139+
}
140+
]
141+
142+
84143
def test_no_stackoverflows(celery):
85144
"""We used to have a bug in the Celery integration where its monkeypatching
86145
was repeated for every task invocation, leading to stackoverflows.

0 commit comments

Comments
 (0)
0