2
2
3
3
import sys
4
4
5
- from celery .signals import task_failure , task_prerun , task_postrun
6
5
from celery .exceptions import SoftTimeLimitExceeded
7
6
8
7
from sentry_sdk .hub import Hub
9
8
from sentry_sdk .utils import capture_internal_exceptions , event_from_exception
9
+ from sentry_sdk ._compat import reraise
10
10
from sentry_sdk .integrations import Integration
11
11
from sentry_sdk .integrations .logging import ignore_logger
12
12
@@ -16,33 +16,42 @@ class CeleryIntegration(Integration):
16
16
17
17
@staticmethod
18
18
def setup_once ():
19
- task_prerun .connect (_handle_task_prerun , weak = False )
20
- task_postrun .connect (_handle_task_postrun , weak = False )
21
- task_failure .connect (_process_failure_signal , weak = False )
19
+ import celery .app .trace as trace
20
+
21
+ old_build_tracer = trace .build_tracer
22
+
23
+ def sentry_build_tracer (name , task , * args , ** kwargs ):
24
+ task .__call__ = _wrap_task_call (task , task .__call__ )
25
+ task .run = _wrap_task_call (task , task .run )
26
+ return old_build_tracer (name , task , * args , ** kwargs )
27
+
28
+ trace .build_tracer = sentry_build_tracer
22
29
23
30
# This logger logs every status of every task that ran on the worker.
24
31
# Meaning that every task's breadcrumbs are full of stuff like "Task
25
32
# <foo> raised unexpected <bar>".
26
33
ignore_logger ("celery.worker.job" )
27
34
28
35
29
- def _process_failure_signal (sender , task_id , einfo , ** kw ):
30
- # einfo from celery is not reliable
31
- exc_info = sys .exc_info ()
32
-
33
- hub = Hub .current
34
- integration = hub .get_integration (CeleryIntegration )
35
- if integration is None :
36
- return
36
+ def _wrap_task_call (self , f ):
37
+ def _inner (* args , ** kwargs ):
38
+ hub = Hub .current
39
+ if hub .get_integration (CeleryIntegration ) is None :
40
+ return f (* args , ** kwargs )
37
41
38
- _capture_event (hub , exc_info )
42
+ with hub .configure_scope () as scope :
43
+ if scope ._name == "celery" :
44
+ return f (* args , ** kwargs )
39
45
46
+ with hub .push_scope () as scope :
47
+ scope ._name = "celery"
48
+ scope .add_event_processor (_make_event_processor (args , kwargs , self ))
49
+ try :
50
+ return f (* args , ** kwargs )
51
+ except Exception :
52
+ reraise (* _capture_exception (hub ))
40
53
41
- def _handle_task_prerun (sender , task , args , kwargs , ** _ ):
42
- hub = Hub .current
43
- if hub .get_integration (CeleryIntegration ) is not None :
44
- scope = hub .push_scope ().__enter__ ()
45
- scope .add_event_processor (_make_event_processor (args , kwargs , task ))
54
+ return _inner
46
55
47
56
48
57
def _make_event_processor (args , kwargs , task ):
@@ -78,16 +87,12 @@ def event_processor(event, hint):
78
87
return event_processor
79
88
80
89
81
- def _handle_task_postrun (sender , task_id , task , ** kw ):
82
- hub = Hub .current
83
- if hub .get_integration (CeleryIntegration ) is not None :
84
- hub .pop_scope_unsafe ()
85
-
86
-
87
- def _capture_event (hub , exc_info ):
90
+ def _capture_exception (hub ):
91
+ exc_info = sys .exc_info ()
88
92
event , hint = event_from_exception (
89
93
exc_info ,
90
94
client_options = hub .client .options ,
91
95
mechanism = {"type" : "celery" , "handled" : False },
92
96
)
93
97
hub .capture_event (event , hint = hint )
98
+ return exc_info
0 commit comments