8000 feat: Capture spans for Celery's apply_async by dcramer · Pull Request #454 · getsentry/sentry-python · GitHub
[go: up one dir, main page]

Skip to content

feat: Capture spans for Celery's apply_async #454

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sentry_sdk/integrations/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ def apply_async(*args, **kwargs):
headers[key] = value
if headers is not None:
kwargs["headers"] = headers
return f(*args, **kwargs)

with hub.span(op="celery.submit", description=task.name):
return f(*args, **kwargs)
else:
return f(*args, **kwargs)

return apply_async

Expand Down
97 changes: 78 additions & 19 deletions tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from sentry_sdk import Hub, configure_scope
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk._compat import text_type

from celery import Celery, VERSION
from celery.bin import worker
Expand All @@ -22,8 +23,11 @@ def inner(signal, f):

@pytest.fixture
def init_celery(sentry_init):
def inner(propagate_traces=True):
sentry_init(integrations=[CeleryIntegration(propagate_traces=propagate_traces)])
def inner(propagate_traces=True, **kwargs):
sentry_init(
integrations=[CeleryIntegration(propagate_traces=propagate_traces)],
**kwargs
)
celery = Celery(__name__)
if VERSION < (4,):
celery.conf.CELERY_ALWAYS_EAGER = True
Expand All @@ -39,22 +43,30 @@ def celery(init_celery):
return init_celery()


@pytest.mark.parametrize(
"invocation,expected_context",
[
[lambda task, x, y: task.delay(x, y), {"args": [1, 0], "kwargs": {}}],
[lambda task, x, y: task.apply_async((x, y)), {"args": [1, 0], "kwargs": {}}],
[
lambda task, x, y: task.apply_async(args=(x, y)),
{"args": [1, 0], "kwargs": {}},
],
[
lambda task, x, y: task.apply_async(kwargs=dict(x=x, y=y)),
{"args": [], "kwargs": {"x": 1, "y": 0}},
],
],
@pytest.fixture(
params=[
lambda task, x, y: (task.delay(x, y), {"args": [x, y], "kwargs": {}}),
lambda task, x, y: (task.apply_async((x, y)), {"args": [x, y], "kwargs": {}}),
lambda task, x, y: (
task.apply_async(args=(x, y)),
{"args": [x, y], "kwargs": {}},
),
lambda task, x, y: (
task.apply_async(kwargs=dict(x=x, y=y)),
{"args": [], "kwargs": {"x": x, "y": y}},
),
]
)
def test_simple(capture_events, celery, invocation, expected_context):
def celery_invocation(request):
"""
Invokes a task in multiple ways Celery allows you to (testing our apply_async monkeypatch).

Currently limited to a task signature of the form foo(x, y)
"""
return request.param


def test_simple(capture_events, celery, celery_invocation):
events = capture_events()

@celery.task(name="dummy_task")
Expand All @@ -63,8 +75,8 @@ def dummy_task(x, y):
return x / y

with Hub.current.span() as span:
invocation(dummy_task, 1, 2)
invocation(dummy_task, 1, 0)
celery_invocation(dummy_task, 1, 2)
_, expected_context = celery_invocation(dummy_task, 1, 0)

event, = events

Expand All @@ -81,6 +93,53 @@ def dummy_task(x, y):
assert exception["stacktrace"]["frames"][0]["vars"]["foo"] == "42"


@pytest.mark.parametrize("task_fails", [True, False], ids=["error", "success"])
def test_transaction_events(capture_events, init_celery, celery_invocation, task_fails):
celery = init_celery(traces_sample_rate=1.0)

@celery.task(name="dummy_task")
def dummy_task(x, y):
return x / y

# XXX: For some reason the first call does not get instrumented properly.
celery_invocation(dummy_task, 1, 1)

events = capture_events()

with Hub.current.span(transaction="submission") as span:
celery_invocation(dummy_task, 1, 0 if task_fails else 1)

if task_fails:
error_event = events.pop(0)
assert error_event["contexts"]["trace"]["trace_id"] == span.trace_id
assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError"

execution_event, submission_event = events

assert execution_event["transaction"] == "dummy_task"
assert submission_event["transaction"] == "submission"

assert execution_event["type"] == submission_event["type"] == "transaction"
assert execution_event["contexts"]["trace"]["trace_id"] == span.trace_id
assert submission_event["contexts"]["trace"]["trace_id"] == span.trace_id

assert execution_event["spans"] == []
assert submission_event["spans"] == [
{
u"data": {},
u"description": u"dummy_task",
u"op": "celery.submit",
u"parent_span_id": submission_event["contexts"]["trace"]["span_id"],
u"same_process_as_parent": True,
u"span_id": submission_event["spans"][0]["span_id"],
u"start_timestamp": submission_event["spans"][0]["start_timestamp"],
u"tags": {u"error": False},
u"timestamp": submission_event["spans"][0]["timestamp"],
u"trace_id": text_type(span.trace_id),
}
]


def test_no_stackoverflows(celery):
"""We used to have a bug in the Celery integration where its monkeypatching
was repeated for every task invocation, leading to stackoverflows.
Expand Down
0