6
6
7
7
from sentry_sdk import Hub , configure_scope
8
8
from sentry_sdk .integrations .celery import CeleryIntegration
9
+ from sentry_sdk ._compat import text_type
9
10
10
11
from celery import Celery , VERSION
11
12
from celery .bin import worker
@@ -22,8 +23,11 @@ def inner(signal, f):
22
23
23
24
@pytest .fixture
24
25
def init_celery (sentry_init ):
25
- def inner (propagate_traces = True ):
26
- sentry_init (integrations = [CeleryIntegration (propagate_traces = propagate_traces )])
26
+ def inner (propagate_traces = True , ** kwargs ):
27
+ sentry_init (
28
+ integrations = [CeleryIntegration (propagate_traces =
8000
span>propagate_traces )],
29
+ ** kwargs
30
+ )
27
31
celery = Celery (__name__ )
28
32
if VERSION < (4 ,):
29
33
celery .conf .CELERY_ALWAYS_EAGER = True
@@ -39,22 +43,30 @@ def celery(init_celery):
39
43
return init_celery ()
40
44
41
45
42
- @pytest .mark .parametrize (
43
- "invocation,expected_context" ,
44
- [
45
- [lambda task , x , y : task .delay (x , y ), {"args" : [1 , 0 ], "kwargs" : {}}],
46
- [lambda task , x , y : task .apply_async ((x , y )), {"args" : [1 , 0 ], "kwargs" : {}}],
47
- [
48
- lambda task , x , y : task .apply_async (args = (x , y )),
49
- {"args" : [1 , 0 ], "kwargs" : {}},
50
- ],
51
- [
52
- lambda task , x , y : task .apply_async (kwargs = dict (x = x , y = y )),
53
- {"args" : [], "kwargs" : {"x" : 1 , "y" : 0 }},
54
- ],
55
- ],
46
+ @pytest .fixture (
47
+ params = [
48
+ lambda task , x , y : (task .delay (x , y ), {"args" : [x , y ], "kwargs" : {}}),
49
+ lambda task , x , y : (task .apply_async ((x , y )), {"args" : [x , y ], "kwargs" : {}}),
50
+ lambda task , x , y : (
51
+ task .apply_async (args = (x , y )),
52
+ {"args" : [x , y ], "kwargs" : {}},
53
+ ),
54
+ lambda task , x , y : (
55
+ task .apply_async (kwargs = dict (x = x , y = y )),
56
+ {"args" : [], "kwargs" : {"x" : x , "y" : y }},
57
+ ),
58
+ ]
56
59
)
57
- def test_simple (capture_events , celery , invocation , expected_context ):
60
+ def celery_invocation (request ):
61
+ """
62
+ Invokes a task in multiple ways Celery allows you to (testing our apply_async monkeypatch).
63
+
64
+ Currently limited to a task signature of the form foo(x, y)
65
+ """
66
+ return request .param
67
+
68
+
69
+ def test_simple (capture_events , celery , celery_invocation ):
58
70
events = capture_events ()
59
71
60
72
@celery .task (name = "dummy_task" )
@@ -63,8 +75,8 @@ def dummy_task(x, y):
63
75
return x / y
64
76
65
77
with Hub .current .span () as span :
66
- invocation (dummy_task , 1 , 2 )
67
- invocation (dummy_task , 1 , 0 )
78
+ celery_invocation (dummy_task , 1 , 2 )
79
+ _ , expected_context = celery_invocation (dummy_task , 1 , 0 )
68
80
69
81
event , = events
70
82
@@ -81,6 +93,53 @@ def dummy_task(x, y):
81
93
assert exception ["stacktrace" ]["frames" ][0 ]["vars" ]["foo" ] == "42"
82
94
83
95
96
+ @pytest .mark .parametrize ("task_fails" , [True , False ], ids = ["error" , "success" ])
97
+ def test_transaction_events (capture_events , init_celery , celery_invocation , task_fails ):
98
+ celery = init_celery (traces_sample_rate = 1.0 )
99
+
100
+ @celery .task (name = "dummy_task" )
101
+ def dummy_task (x , y ):
102
+ return x / y
103
+
104
+ # XXX: For some reason the first call does not get instrumented properly.
105
+ celery_invocation (dummy_task , 1 , 1 )
106
+
107
+ events = capture_events ()
108
+
109
+ with Hub .current .span (transaction = "submission" ) as span :
110
+ celery_invocation (dummy_task , 1 , 0 if task_fails else 1 )
111
+
112
+ if task_fails :
113
+ error_event = events<
A3E2
/span>.pop (0 )
114
+ assert error_event ["contexts" ]["trace" ]["trace_id" ] == span .trace_id
115
+ assert error_event ["exception" ]["values" ][0 ]["type" ] == "ZeroDivisionError"
116
+
117
+ execution_event , submission_event = events
118
+
119
+ assert execution_event ["transaction" ] == "dummy_task"
120
+ assert submission_event ["transaction" ] == "submission"
121
+
122
+ assert execution_event ["type" ] == submission_event ["type" ] == "transaction"
123
+ assert execution_event ["contexts" ]["trace" ]["trace_id" ] == span .trace_id
124
+ assert submission_event ["contexts" ]["trace" ]["trace_id" ] == span .trace_id
125
+
126
+ assert execution_event ["spans" ] == []
127
+ assert submission_event ["spans" ] == [
128
+ {
129
+ u"data" : {},
130
+ u"description" : u"dummy_task" ,
131
+ u"op" : "celery.submit" ,
132
+ u"parent_span_id" : submission_event ["contexts" ]["trace" ]["span_id" ],
133
+ u"same_process_as_parent" : True ,
134
+ u"span_id" : submission_event ["spans" ][0 ]["span_id" ],
135
+ u"start_timestamp" : submission_event ["spans" ][0 ]["start_timestamp" ],
136
+ u"tags" : {u"error" : False },
137
+ u"timestamp" : submission_event ["spans" ][0 ]["timestamp" ],
138
+ u"trace_id" : text_type (span .trace_id ),
139
+ }
140
+ ]
141
+
142
+
84
143
def test_no_stackoverflows (celery ):
85
144
"""We used to have a bug in the Celery integration where its monkeypatching
86
145
was repeated for every task invocation, leading to stackoverflows.
0 commit comments