1
+ from __future__ import annotations
1
2
import sys
2
3
from collections .abc import Mapping
3
4
from functools import wraps
@@ -62,11 +63,10 @@ class CeleryIntegration(Integration):
62
63
63
64
def __init__ (
64
65
self ,
65
- propagate_traces = True ,
66
- monitor_beat_tasks = False ,
67
- exclude_beat_tasks = None ,
68
- ):
69
- # type: (bool, bool, Optional[List[str]]) -> None
66
+ propagate_traces : bool = True ,
67
+ monitor_beat_tasks : bool = False ,
68
+ exclude_beat_tasks : Optional [List [str ]] = None ,
69
+ ) -> None :
70
70
self .propagate_traces = propagate_traces
71
71
self .monitor_beat_tasks = monitor_beat_tasks
72
72
self .exclude_beat_tasks = exclude_beat_tasks
@@ -76,8 +76,7 @@ def __init__(
76
76
_setup_celery_beat_signals (monitor_beat_tasks )
77
77
78
78
@staticmethod
79
- def setup_once ():
80
- # type: () -> None
79
+ def setup_once () -> None :
81
80
_check_minimum_version (CeleryIntegration , CELERY_VERSION )
82
81
83
82
_patch_build_tracer ()
@@ -97,16 +96,14 @@ def setup_once():
97
96
ignore_logger ("celery.redirected" )
98
97
99
98
100
- def _set_status (status ):
101
- # type: (str) -> None
99
+ def _set_status (status : str ) -> None :
102
100
with capture_internal_exceptions ():
103
101
span = sentry_sdk .get_current_span ()
104
102
if span is not None :
105
103
span .set_status (status )
106
104
107
105
108
- def _capture_exception (task , exc_info ):
109
- # type: (Any, ExcInfo) -> None
106
+ def _capture_exception (task : Any , exc_info : ExcInfo ) -> None :
110
107
client = sentry_sdk .get_client ()
111
108
if client .get_integration (CeleryIntegration ) is None :
112
109
return
@@ -129,10 +126,10 @@ def _capture_exception(task, exc_info):
129
126
sentry_sdk .capture_event (event , hint = hint )
130
127
131
128
132
- def _make_event_processor (task , uuid , args , kwargs , request = None ):
133
- # type: ( Any, Any, Any, Any, Optional[Any]) -> EventProcessor
134
- def event_processor ( event , hint ) :
135
- # type: ( Event, Hint) -> Optional[Event]
129
+ def _make_event_processor (
130
+ task : Any , uuid : Any , args : Any , kwargs : Any , request : Optional [Any ] = None
131
+ ) -> EventProcessor :
132
+ def event_processor ( event : Event , hint : Hint ) -> Optional [Event ]:
136
133
137
134
with capture_internal_exceptions ():
138
135
tags = event .setdefault ("tags" , {})
@@ -158,8 +155,9 @@ def event_processor(event, hint):
158
155
return event_processor
159
156
160
157
161
- def _update_celery_task_headers (original_headers , span , monitor_beat_tasks ):
162
- # type: (dict[str, Any], Optional[Span], bool) -> dict[str, Any]
158
+ def _update_celery_task_headers (
159
+ original_headers : dict [str , Any ], span : Optional [Span ], monitor_beat_tasks : bool
160
+ ) -> dict [str , Any ]:
163
161
"""
164
162
Updates the headers of the Celery task with the tracing information
165
163
and eventually Sentry Crons monitoring information for beat tasks.
@@ -233,20 +231,16 @@ def _update_celery_task_headers(original_headers, span, monitor_beat_tasks):
233
231
234
232
235
233
class NoOpMgr :
236
- def __enter__ (self ):
237
- # type: () -> None
234
+ def __enter__ (self ) -> None :
238
235
return None
239
236
240
- def __exit__ (self , exc_type , exc_value , traceback ):
241
- # type: (Any, Any, Any) -> None
237
+ def __exit__ (self , exc_type : Any , exc_value : Any , traceback : Any ) -> None :
242
238
return None
243
239
244
240
245
- def _wrap_task_run (f ):
246
- # type: (F) -> F
241
+ def _wrap_task_run (f : F ) -> F :
247
242
@wraps (f )
248
- def apply_async (* args , ** kwargs ):
249
- # type: (*Any, **Any) -> Any
243
+ def apply_async (* args : Any , ** kwargs : Any ) -> Any :
250
244
# Note: kwargs can contain headers=None, so no setdefault!
251
245
# Unsure which backend though.
252
246
integration = sentry_sdk .get_client ().get_integration (CeleryIntegration )
@@ -262,15 +256,15 @@ def apply_async(*args, **kwargs):
262
256
return f (* args , ** kwargs )
263
257
264
258
if isinstance (args [0 ], Task ):
265
- task_name = args [0 ].name # type: str
259
+ task_name : str = args [0 ].name
266
260
elif len (args ) > 1 and isinstance (args [1 ], str ):
267
261
task_name = args [1 ]
268
262
else :
269
263
task_name = "<unknown Celery task>"
270
264
271
265
task_started_from_beat = sentry_sdk .get_isolation_scope ()._name == "celery-beat"
272
266
273
- span_mgr = (
267
+ span_mgr : Union [ Span , NoOpMgr ] = (
274
268
sentry_sdk .start_span (
275
269
op = OP .QUEUE_SUBMIT_CELERY ,
276
270
name = task_name ,
@@ -279,7 +273,7 @@ def apply_async(*args, **kwargs):
279
273
)
280
274
if not task_started_from_beat
281
275
else NoOpMgr ()
282
- ) # type: Union[Span, NoOpMgr]
276
+ )
283
277
284
278
with span_mgr as span :
285
279
kwargs ["headers" ] = _update_celery_task_headers (
@@ -290,8 +284,7 @@ def apply_async(*args, **kwargs):
290
284
return apply_async # type: ignore
291
285
292
286
293
- def _wrap_tracer (task , f ):
294
- # type: (Any, F) -> F
287
+ def _wrap_tracer (task : Any , f : F ) -> F :
295
288
296
289
# Need to wrap tracer for pushing the scope before prerun is sent, and
297
290
# popping it after postrun is sent.
@@ -301,8 +294,7 @@ def _wrap_tracer(task, f):
301
294
# crashes.
302
295
@wraps (f )
303
296
@ensure_integration_enabled (CeleryIntegration , f )
304
- def _inner (* args , ** kwargs ):
305
- # type: (*Any, **Any) -> Any
297
+ def _inner (* args : Any , ** kwargs : Any ) -> Any :
306
298
with isolation_scope () as scope :
307
299
scope ._name = "celery"
308
300
scope .clear_breadcrumbs ()
@@ -333,8 +325,7 @@ def _inner(*args, **kwargs):
333
325
return _inner # type: ignore
334
326
335
327
336
- def _set_messaging_destination_name (task , span ):
337
- # type: (Any, Span) -> None
328
+ def _set_messaging_destination_name (task : Any , span : Span ) -> None :
338
329
"""Set "messaging.destination.name" tag for span"""
339
330
with capture_internal_exceptions ():
340
331
delivery_info = task .request .delivery_info
@@ -346,8 +337,7 @@ def _set_messaging_destination_name(task, span):
346
337
span .set_attribute (SPANDATA .MESSAGING_DESTINATION_NAME , routing_key )
347
338
348
339
349
- def _wrap_task_call (task , f ):
350
- # type: (Any, F) -&g
1241
t; F
340
+ def _wrap_task_call (task : Any , f : F ) -> F :
351
341
352
342
# Need to wrap task call because the exception is caught before we get to
353
343
# see it. Also celery's reported stacktrace is untrustworthy.
@@ -358,8 +348,7 @@ def _wrap_task_call(task, f):
358
348
# to add @functools.wraps(f) here.
359
349
# https://github.com/getsentry/sentry-python/issues/421
360
350
@ensure_integration_enabled (CeleryIntegration , f )
361
- def _inner (* args , ** kwargs ):
362
- # type: (*Any, **Any) -> Any
351
+ def _inner (* args : Any , ** kwargs : Any ) -> Any :
363
352
try :
364
353
with sentry_sdk .start_span (
365
354
op = OP .QUEUE_PROCESS ,
@@ -409,14 +398,12 @@ def _inner(*args, **kwargs):
409
398
return _inner # type: ignore
410
399
411
400
412
- def _patch_build_tracer ():
413
- # type: () -> None
401
+ def _patch_build_tracer () -> None :
414
402
import celery .app .trace as trace # type: ignore
415
403
416
404
original_build_tracer = trace .build_tracer
417
405
418
- def sentry_build_tracer (name , task , * args , ** kwargs ):
419
- # type: (Any, Any, *Any, **Any) -> Any
406
+ def sentry_build_tracer (name : Any , task : Any , * args : Any , ** kwargs : Any ) -> Any :
420
407
if not getattr (task , "_sentry_is_patched" , False ):
421
408
# determine whether Celery will use __call__ or run and patch
422
409
# accordingly
@@ -435,29 +422,25 @@ def sentry_build_tracer(name, task, *args, **kwargs):
435
422
trace .build_tracer = sentry_build_tracer
436
423
437
424
438
- def _patch_task_apply_async ():
439
- # type: () -> None
425
+ def _patch_task_apply_async () -> None :
440
426
Task .apply_async = _wrap_task_run (Task .apply_async )
441
427
442
428
443
- def _patch_celery_send_task ():
444
- # type: () -> None
429
+ def _patch_celery_send_task () -> None :
445
430
from celery import Celery
446
431
447
432
Celery .send_task = _wrap_task_run (Celery .send_task )
448
433
449
434
450
- def _patch_worker_exit ():
451
- # type: () -> None
435
+ def _patch_worker_exit () -> None :
452
436
453
437
# Need to flush queue before worker shutdown because a crashing worker will
454
438
# call os._exit
455
439
from billiard .pool import Worker # type: ignore
456
440
457
441
original_workloop = Worker .workloop
458
442
459
- def sentry_workloop (* args , ** kwargs ):
460
- # type: (*Any, **Any) -> Any
443
+ def sentry_workloop (* args : Any , ** kwargs : Any ) -> Any :
461
444
try :
462
445
return original_workloop (* args , ** kwargs )
463
446
finally :
@@ -471,13 +454,11 @@ def sentry_workloop(*args, **kwargs):
471
454
Worker .workloop = sentry_workloop
472
455
473
456
474
- def _patch_producer_publish ():
475
- # type: () -> None
457
+ def _patch_producer_publish () -> None :
476
458
original_publish = Producer .publish
477
459
478
460
@ensure_integration_enabled (CeleryIntegration , original_publish )
479
- def sentry_publish (self , * args , ** kwargs ):
480
- # type: (Producer, *Any, **Any) -> Any
461
+ def sentry_publish (self : Producer , * args : Any , ** kwargs : Any ) -> Any :
481
462
kwargs_headers = kwargs .get ("headers" , {})
482
463
if not isinstance (kwargs_headers , Mapping ):
483
464
# Ensure kwargs_headers is a Mapping, so we can safely call get().
@@ -521,8 +502,7 @@ def sentry_publish(self, *args, **kwargs):
521
502
Producer .publish = sentry_publish
522
503
523
504
524
- def _prepopulate_attributes (task , args , kwargs ):
525
- # type: (Any, *Any, **Any) -> dict[str, str]
505
+ def _prepopulate_attributes (task : Any , args : Any , kwargs : Any ) -> dict [str , str ]:
526
506
attributes = {
527
507
"celery.job.task" : task .name ,
528
508
}
0 commit comments