8000 Migrate typing for integrations - part 5 (#4534) · getsentry/sentry-python@4c8b4e9 · GitHub
[go: up one dir, main page]

Skip to content

Commit 4c8b4e9

Browse files
authored
Migrate typing for integrations - part 5 (#4534)
1 parent 66d06f5 commit 4c8b4e9

29 files changed

+440
-481
lines changed

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,3 +214,6 @@ exclude = [
214214
"grpc_test_service_pb2.py",
215215
"grpc_test_service_pb2_grpc.py",
216216
]
217+
per-file-ignores = [
218+
"sentry_sdk/integrations/spark/*:N802,N803",
219+
]

sentry_sdk/integrations/celery/__init__.py

Lines changed: 36 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from __future__ import annotations
12
import sys
23
from collections.abc import Mapping
34
from functools import wraps
@@ -62,11 +63,10 @@ class CeleryIntegration(Integration):
6263

6364
def __init__(
6465
self,
65-
propagate_traces=True,
66-
monitor_beat_tasks=False,
67-
exclude_beat_tasks=None,
68-
):
69-
# type: (bool, bool, Optional[List[str]]) -> None
66+
propagate_traces: bool = True,
67+
monitor_beat_tasks: bool = False,
68+
exclude_beat_tasks: Optional[List[str]] = None,
69+
) -> None:
7070
self.propagate_traces = propagate_traces
7171
self.monitor_beat_tasks = monitor_beat_tasks
7272
self.exclude_beat_tasks = exclude_beat_tasks
@@ -76,8 +76,7 @@ def __init__(
7676
_setup_celery_beat_signals(monitor_beat_tasks)
7777

7878
@staticmethod
79-
def setup_once():
80-
# type: () -> None
79+
def setup_once() -> None:
8180
_check_minimum_version(CeleryIntegration, CELERY_VERSION)
8281

8382
_patch_build_tracer()
@@ -97,16 +96,14 @@ def setup_once():
9796
ignore_logger("celery.redirected")
9897

9998

100-
def _set_status(status):
101-
# type: (str) -> None
99+
def _set_status(status: str) -> None:
102100
with capture_internal_exceptions():
103101
span = sentry_sdk.get_current_span()
104102
if span is not None:
105103
span.set_status(status)
106104

107105

108-
def _capture_exception(task, exc_info):
109-
# type: (Any, ExcInfo) -> None
106+
def _capture_exception(task: Any, exc_info: ExcInfo) -> None:
110107
client = sentry_sdk.get_client()
111108
if client.get_integration(CeleryIntegration) is None:
112109
return
@@ -129,10 +126,10 @@ def _capture_exception(task, exc_info):
129126
sentry_sdk.capture_event(event, hint=hint)
130127

131128

132-
def _make_event_processor(task, uuid, args, kwargs, request=None):
133-
# type: (Any, Any, Any, Any, Optional[Any]) -> EventProcessor
134-
def event_processor(event, hint):
135-
# type: (Event, Hint) -> Optional[Event]
129+
def _make_event_processor(
130+
task: Any, uuid: Any, args: Any, kwargs: Any, request: Optional[Any] = None
131+
) -> EventProcessor:
132+
def event_processor(event: Event, hint: Hint) -> Optional[Event]:
136133

137134
with capture_internal_exceptions():
138135
tags = event.setdefault("tags", {})
@@ -158,8 +155,9 @@ def event_processor(event, hint):
158155
return event_processor
159156

160157

161-
def _update_celery_task_headers(original_headers, span, monitor_beat_tasks):
162-
# type: (dict[str, Any], Optional[Span], bool) -> dict[str, Any]
158+
def _update_celery_task_headers(
159+
original_headers: dict[str, Any], span: Optional[Span], monitor_beat_tasks: bool
160+
) -> dict[str, Any]:
163161
"""
164162
Updates the headers of the Celery task with the tracing information
165163
and eventually Sentry Crons monitoring information for beat tasks.
@@ -233,20 +231,16 @@ def _update_celery_task_headers(original_headers, span, monitor_beat_tasks):
233231

234232

235233
class NoOpMgr:
236-
def __enter__(self):
237-
# type: () -> None
234+
def __enter__(self) -> None:
238235
return None
239236

240-
def __exit__(self, exc_type, exc_value, traceback):
241-
# type: (Any, Any, Any) -> None
237+
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
242238
return None
243239

244240

245-
def _wrap_task_run(f):
246-
# type: (F) -> F
241+
def _wrap_task_run(f: F) -> F:
247242
@wraps(f)
248-
def apply_async(*args, **kwargs):
249-
# type: (*Any, **Any) -> Any
243+
def apply_async(*args: Any, **kwargs: Any) -> Any:
250244
# Note: kwargs can contain headers=None, so no setdefault!
251245
# Unsure which backend though.
252246
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
@@ -262,15 +256,15 @@ def apply_async(*args, **kwargs):
262256
return f(*args, **kwargs)
263257

264258
if isinstance(args[0], Task):
265-
task_name = args[0].name # type: str
259+
task_name: str = args[0].name
266260
elif len(args) > 1 and isinstance(args[1], str):
267261
task_name = args[1]
268262
else:
269263
task_name = "<unknown Celery task>"
270264

271265
task_started_from_beat = sentry_sdk.get_isolation_scope()._name == "celery-beat"
272266

273-
span_mgr = (
267+
span_mgr: Union[Span, NoOpMgr] = (
274268
sentry_sdk.start_span(
275269
op=OP.QUEUE_SUBMIT_CELERY,
276270
name=task_name,
@@ -279,7 +273,7 @@ def apply_async(*args, **kwargs):
279273
)
280274
if not task_started_from_beat
281275
else NoOpMgr()
282-
) # type: Union[Span, NoOpMgr]
276+
)
283277

284278
with span_mgr as span:
285279
kwargs["headers"] = _update_celery_task_headers(
@@ -290,8 +284,7 @@ def apply_async(*args, **kwargs):
290284
return apply_async # type: ignore
291285

292286

293-
def _wrap_tracer(task, f):
294-
# type: (Any, F) -> F
287+
def _wrap_tracer(task: Any, f: F) -> F:
295288

296289
# Need to wrap tracer for pushing the scope before prerun is sent, and
297290
# popping it after postrun is sent.
@@ -301,8 +294,7 @@ def _wrap_tracer(task, f):
301294
# crashes.
302295
@wraps(f)
303296
@ensure_integration_enabled(CeleryIntegration, f)
304-
def _inner(*args, **kwargs):
305-
# type: (*Any, **Any) -> Any
297+
def _inner(*args: Any, **kwargs: Any) -> Any:
306298
with isolation_scope() as scope:
307299
scope._name = "celery"
308300
scope.clear_breadcrumbs()
@@ -333,8 +325,7 @@ def _inner(*args, **kwargs):
333325
return _inner # type: ignore
334326

335327

336-
def _set_messaging_destination_name(task, span):
337-
# type: (Any, Span) -> None
328+
def _set_messaging_destination_name(task: Any, span: Span) -> None:
338329
"""Set "messaging.destination.name" tag for span"""
339330
with capture_internal_exceptions():
340331
delivery_info = task.request.delivery_info
@@ -346,8 +337,7 @@ def _set_messaging_destination_name(task, span):
346337
span.set_attribute(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
347338

348339

349-
def _wrap_task_call(task, f):
350-
# type: (Any, F) -&g 1241 t; F
340+
def _wrap_task_call(task: Any, f: F) -> F:
351341

352342
# Need to wrap task call because the exception is caught before we get to
353343
# see it. Also celery's reported stacktrace is untrustworthy.
@@ -358,8 +348,7 @@ def _wrap_task_call(task, f):
358348
# to add @functools.wraps(f) here.
359349
# https://github.com/getsentry/sentry-python/issues/421
360350
@ensure_integration_enabled(CeleryIntegration, f)
361-
def _inner(*args, **kwargs):
362-
# type: (*Any, **Any) -> Any
351+
def _inner(*args: Any, **kwargs: Any) -> Any:
363352
try:
364353
with sentry_sdk.start_span(
365354
op=OP.QUEUE_PROCESS,
@@ -409,14 +398,12 @@ def _inner(*args, **kwargs):
409398
return _inner # type: ignore
410399

411400

412-
def _patch_build_tracer():
413-
# type: () -> None
401+
def _patch_build_tracer() -> None:
414402
import celery.app.trace as trace # type: ignore
415403

416404
original_build_tracer = trace.build_tracer
417405

418-
def sentry_build_tracer(name, task, *args, **kwargs):
419-
# type: (Any, Any, *Any, **Any) -> Any
406+
def sentry_build_tracer(name: Any, task: Any, *args: Any, **kwargs: Any) -> Any:
420407
if not getattr(task, "_sentry_is_patched", False):
421408
# determine whether Celery will use __call__ or run and patch
422409
# accordingly
@@ -435,29 +422,25 @@ def sentry_build_tracer(name, task, *args, **kwargs):
435422
trace.build_tracer = sentry_build_tracer
436423

437424

438-
def _patch_task_apply_async():
439-
# type: () -> None
425+
def _patch_task_apply_async() -> None:
440426
Task.apply_async = _wrap_task_run(Task.apply_async)
441427

442428

443-
def _patch_celery_send_task():
444-
# type: () -> None
429+
def _patch_celery_send_task() -> None:
445430
from celery import Celery
446431

447432
Celery.send_task = _wrap_task_run(Celery.send_task)
448433

449434

450-
def _patch_worker_exit():
451-
# type: () -> None
435+
def _patch_worker_exit() -> None:
452436

453437
# Need to flush queue before worker shutdown because a crashing worker will
454438
# call os._exit
455439
from billiard.pool import Worker # type: ignore
456440

457441
original_workloop = Worker.workloop
458442

459-
def sentry_workloop(*args, **kwargs):
460-
# type: (*Any, **Any) -> Any
443+
def sentry_workloop(*args: Any, **kwargs: Any) -> Any:
461444
try:
462445
return original_workloop(*args, **kwargs)
463446
finally:
@@ -471,13 +454,11 @@ def sentry_workloop(*args, **kwargs):
471454
Worker.workloop = sentry_workloop
472455

473456

474-
def _patch_producer_publish():
475-
# type: () -> None
457+
def _patch_producer_publish() -> None:
476458
original_publish = Producer.publish
477459

478460
@ensure_integration_enabled(CeleryIntegration, original_publish)
479-
def sentry_publish(self, *args, **kwargs):
480-
# type: (Producer, *Any, **Any) -> Any
461+
def sentry_publish(self: Producer, *args: Any, **kwargs: Any) -> Any:
481462
kwargs_headers = kwargs.get("headers", {})
482463
if not isinstance(kwargs_headers, Mapping):
483464
# Ensure kwargs_headers is a Mapping, so we can safely call get().
@@ -521,8 +502,7 @@ def sentry_publish(self, *args, **kwargs):
521502
Producer.publish = sentry_publish
522503

523504

524-
def _prepopulate_attributes(task, args, kwargs):
525-
# type: (Any, *Any, **Any) -> dict[str, str]
505+
def _prepopulate_attributes(task: Any, args: Any, kwargs: Any) -> dict[str, str]:
526506
attributes = {
527507
"celery.job.task": task.name,
528508
}

sentry_sdk/integrations/celery/beat.py

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from __future__ import annotations
12
import sentry_sdk
23
from sentry_sdk.crons import capture_checkin, MonitorStatus
34
from sentry_sdk.integrations import DidNotEnable
@@ -42,8 +43,7 @@
4243
RedBeatScheduler = None
4344

4445

45-
def _get_headers(task):
46-
# type: (Task) -> dict[str, Any]
46+
def _get_headers(task: Task) -> dict[str, Any]:
4747
headers = task.request.get("headers") or {}
4848

4949
# flatten nested headers
@@ -56,12 +56,13 @@ def _get_headers(task):
5656
return headers
5757

5858

59-
def _get_monitor_config(celery_schedule, app, monitor_name):
60-
# type: (Any, Celery, str) -> MonitorConfig
61-
monitor_config = {} # type: MonitorConfig
62-
schedule_type = None # type: Optional[MonitorConfigScheduleType]
63-
schedule_value = None # type: Optional[Union[str, int]]
64-
schedule_unit = None # type: Optional[MonitorConfigScheduleUnit]
59+
def _get_monitor_config(
60+
celery_schedule: Any, app: Celery, monitor_name: str
61+
) -> MonitorConfig:
62+
monitor_config: MonitorConfig = {}
63+
schedule_type: Optional[MonitorConfigScheduleType] = None
64+
schedule_value: Optional[Union[str, int]] = None
65+
schedule_unit: Optional[MonitorConfigScheduleUnit] = None
6566

6667
if isinstance(celery_schedule, crontab):
6768
schedule_type = "crontab"
@@ -113,8 +114,11 @@ def _get_monitor_config(celery_schedule, app, monitor_name):
113114
return monitor_config
114115

115116

116-
def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration):
117-
# type: (Any, Any, sentry_sdk.integrations.celery.CeleryIntegration) -> None
117+
def _apply_crons_data_to_schedule_entry(
118+
scheduler: Any,
119+
schedule_entry: Any,
120+
integration: sentry_sdk.integrations.celery.CeleryIntegration,
121+
) -> None:
118122
"""
119123
Add Sentry Crons information to the schedule_entry headers.
120124
"""
@@ -158,8 +162,7 @@ def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration):
158162
schedule_entry.options["headers"] = headers
159163

160164

161-
def _wrap_beat_scheduler(original_function):
162-
# type: (Callable[..., Any]) -> Callable[..., Any]
165+
def _wrap_beat_scheduler(original_function: Callable[..., Any]) -> Callable[..., Any]:
163166
"""
164167
Makes sure that:
165168
- a new Sentry trace is started for each task started by Celery Beat and
@@ -178,8 +181,7 @@ def _wrap_beat_scheduler(original_function):
178181

179182
from sentry_sdk.integrations.celery import CeleryIntegration
180183

181-
def sentry_patched_scheduler(*args, **kwargs):
182-
# type: (*Any, **Any) -> None
184+
def sentry_patched_scheduler(*args: Any, **kwargs: Any) -> None:
183185
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
184186
if integration is None:
185187
return original_function(*args, **kwargs)
@@ -197,29 +199,25 @@ def sentry_patched_scheduler(*args, **kwargs):
197199
return sentry_patched_scheduler
198200

199201

200-
def _patch_beat_apply_entry():
201-
# type: () -> None
202+
def _patch_beat_apply_entry() -> None:
202203
Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry)
203204

204205

205-
def _patch_redbeat_apply_async():
206-
# type: () -> None
206+
def _patch_redbeat_apply_async() -> None:
207207
if RedBeatScheduler is None:
208208
return
209209

210210
RedBeatScheduler.apply_async = _wrap_beat_scheduler(RedBeatScheduler.apply_async)
211211

212212

213-
def _setup_celery_beat_signals(monitor_beat_tasks):
214-
# type: (bool) -> None
213+
def _setup_celery_beat_signals(monitor_beat_tasks: bool) -> None:
215214
if monitor_beat_tasks:
216215
task_success.connect(crons_task_success)
217216
task_failure.connect(crons_task_failure)
218217
task_retry.connect(crons_task_retry)
219218

220219

221-
def crons_task_success(sender, **kwargs):
222-
# type: (Task, dict[Any, Any]) -> None
220+
def crons_task_success(sender: Task, **kwargs: dict[Any, Any]) -> None:
223221
logger.debug("celery_task_success %s", sender)
224222
headers = _get_headers(sender)
225223

@@ -243,8 +241,7 @@ def crons_task_success(sender, **kwargs):
243241
)
244242

245243

246-
def crons_task_failure(sender, **kwargs):
247-
# type: (Task, dict[Any, Any]) -> None
244+
def crons_task_failure(sender: Task, **kwargs: dict[Any, Any]) -> None:
248245
logger.debug("celery_task_failure %s", sender)
249246
headers = _get_headers(sender)
250247

@@ -268,8 +265,7 @@ def crons_task_failure(sender, **kwargs):
268265
)
269266

270267

271-
def crons_task_retry(sender, **kwargs):
272-
# type: (Task, dict[Any, Any]) -> None
268+
def crons_task_retry(sender: Task, **kwargs: dict[Any, Any]) -> None:
273269
logger.debug("celery_task_retry %s", sender)
274270
headers = _get_headers(sender)
275271

0 commit comments

Comments
 (0)
0