From 359c228b9fd322e127444f79c1e1da230979d457 Mon Sep 17 00:00:00 2001 From: David Cramer Date: Wed, 7 Aug 2019 12:19:09 -0700 Subject: [PATCH 1/2] feat: Capture spans for Celery's apply_async --- sentry_sdk/integrations/celery.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/celery.py b/sentry_sdk/integrations/celery.py index a0e23ae85f..83dad7fe86 100644 --- a/sentry_sdk/integrations/celery.py +++ b/sentry_sdk/integrations/celery.py @@ -77,7 +77,11 @@ def apply_async(*args, **kwargs): headers[key] = value if headers is not None: kwargs["headers"] = headers - return f(*args, **kwargs) + + with hub.span(op="celery", description=task.name): + return f(*args, **kwargs) + else: + return f(*args, **kwargs) return apply_async From efce9e3dd9334d522295fe7f9db602593b6ea9df Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 8 Aug 2019 11:15:16 +0200 Subject: [PATCH 2/2] test: Add basic test for submission span --- sentry_sdk/integrations/celery.py | 2 +- tests/integrations/celery/test_celery.py | 97 +++++++++++++++++++----- 2 files changed, 79 insertions(+), 20 deletions(-) diff --git a/sentry_sdk/integrations/celery.py b/sentry_sdk/integrations/celery.py index 83dad7fe86..a43694a6be 100644 --- a/sentry_sdk/integrations/celery.py +++ b/sentry_sdk/integrations/celery.py @@ -78,7 +78,7 @@ def apply_async(*args, **kwargs): if headers is not None: kwargs["headers"] = headers - with hub.span(op="celery", description=task.name): + with hub.span(op="celery.submit", description=task.name): return f(*args, **kwargs) else: return f(*args, **kwargs) diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index c9a9bae3f1..a6818c5c5f 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -6,6 +6,7 @@ from sentry_sdk import Hub, configure_scope from sentry_sdk.integrations.celery import CeleryIntegration +from sentry_sdk._compat import text_type from celery import Celery, VERSION from celery.bin import worker @@ -22,8 +23,11 @@ def inner(signal, f): @pytest.fixture def init_celery(sentry_init): - def inner(propagate_traces=True): - sentry_init(integrations=[CeleryIntegration(propagate_traces=propagate_traces)]) + def inner(propagate_traces=True, **kwargs): + sentry_init( + integrations=[CeleryIntegration(propagate_traces=propagate_traces)], + **kwargs + ) celery = Celery(__name__) if VERSION < (4,): celery.conf.CELERY_ALWAYS_EAGER = True @@ -39,22 +43,30 @@ def celery(init_celery): return init_celery() -@pytest.mark.parametrize( - "invocation,expected_context", - [ - [lambda task, x, y: task.delay(x, y), {"args": [1, 0], "kwargs": {}}], - [lambda task, x, y: task.apply_async((x, y)), {"args": [1, 0], "kwargs": {}}], - [ - lambda task, x, y: task.apply_async(args=(x, y)), - {"args": [1, 0], "kwargs": {}}, - ], - [ - lambda task, x, y: task.apply_async(kwargs=dict(x=x, y=y)), - {"args": [], "kwargs": {"x": 1, "y": 0}}, - ], - ], +@pytest.fixture( + params=[ + lambda task, x, y: (task.delay(x, y), {"args": [x, y], "kwargs": {}}), + lambda task, x, y: (task.apply_async((x, y)), {"args": [x, y], "kwargs": {}}), + lambda task, x, y: ( + task.apply_async(args=(x, y)), + {"args": [x, y], "kwargs": {}}, + ), + lambda task, x, y: ( + task.apply_async(kwargs=dict(x=x, y=y)), + {"args": [], "kwargs": {"x": x, "y": y}}, + ), + ] ) -def test_simple(capture_events, celery, invocation, expected_context): +def celery_invocation(request): + """ + Invokes a task in multiple ways Celery allows you to (testing our apply_async monkeypatch). + + Currently limited to a task signature of the form foo(x, y) + """ + return request.param + + +def test_simple(capture_events, celery, celery_invocation): events = capture_events() @celery.task(name="dummy_task") @@ -63,8 +75,8 @@ def dummy_task(x, y): return x / y with Hub.current.span() as span: - invocation(dummy_task, 1, 2) - invocation(dummy_task, 1, 0) + celery_invocation(dummy_task, 1, 2) + _, expected_context = celery_invocation(dummy_task, 1, 0) event, = events @@ -81,6 +93,53 @@ def dummy_task(x, y): assert exception["stacktrace"]["frames"][0]["vars"]["foo"] == "42" +@pytest.mark.parametrize("task_fails", [True, False], ids=["error", "success"]) +def test_transaction_events(capture_events, init_celery, celery_invocation, task_fails): + celery = init_celery(traces_sample_rate=1.0) + + @celery.task(name="dummy_task") + def dummy_task(x, y): + return x / y + + # XXX: For some reason the first call does not get instrumented properly. + celery_invocation(dummy_task, 1, 1) + + events = capture_events() + + with Hub.current.span(transaction="submission") as span: + celery_invocation(dummy_task, 1, 0 if task_fails else 1) + + if task_fails: + error_event = events.pop(0) + assert error_event["contexts"]["trace"]["trace_id"] == span.trace_id + assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError" + + execution_event, submission_event = events + + assert execution_event["transaction"] == "dummy_task" + assert submission_event["transaction"] == "submission" + + assert execution_event["type"] == submission_event["type"] == "transaction" + assert execution_event["contexts"]["trace"]["trace_id"] == span.trace_id + assert submission_event["contexts"]["trace"]["trace_id"] == span.trace_id + + assert execution_event["spans"] == [] + assert submission_event["spans"] == [ + { + u"data": {}, + u"description": u"dummy_task", + u"op": "celery.submit", + u"parent_span_id": submission_event["contexts"]["trace"]["span_id"], + u"same_process_as_parent": True, + u"span_id": submission_event["spans"][0]["span_id"], + u"start_timestamp": submission_event["spans"][0]["start_timestamp"], + u"tags": {u"error": False}, + u"timestamp": submission_event["spans"][0]["timestamp"], + u"trace_id": text_type(span.trace_id), + } + ] + + def test_no_stackoverflows(celery): """We used to have a bug in the Celery integration where its monkeypatching was repeated for every task invocation, leading to stackoverflows.