8000 fix(celery): flush queue on worker exit by untitaker · Pull Request #295 · getsentry/sentry-python · GitHub
[go: up one dir, main page]

Skip to content

fix(celery): flush queue on worker exit #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 19, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 43 additions & 23 deletions sentry_sdk/integrations/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ def setup_once():
def sentry_build_tracer(name, task, *args, **kwargs):
# Need to patch both methods because older celery sometimes
# short-circuits to task.run if it thinks it's safe.
task.__call__ = _wrap_task_call(task.__call__)
task.run = _wrap_task_call(task.run)
task.__call__ = _wrap_task_call(task, task.__call__)
task.run = _wrap_task_call(task, task.run)
return _wrap_tracer(task, old_build_tracer(name, task, *ar 8000 gs, **kwargs))

trace.build_tracer = sentry_build_tracer

_patch_worker_exit()

# This logger logs every status of every task that ran on the worker.
# Meaning that every task's breadcrumbs are full of stuff like "Task
# <foo> raised unexpected <bar>".
Expand Down Expand Up @@ -56,14 +58,17 @@ def _inner(*args, **kwargs):
return _inner


def _wrap_task_call(f):
def _wrap_task_call(task, f):
# Need to wrap task call because the exception is caught before we get to
# see it. Also celery's reported stacktrace is untrustworthy.
def _inner(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception:
reraise(*_capture_exception())
exc_info = sys.exc_info()
with capture_internal_exceptions():
_capture_exception(task, exc_info)
reraise(*exc_info)

return _inner

Expand All @@ -82,15 +87,6 @@ def event_processor(event, hint):
}

if "exc_info" in hint:
with capture_internal_exceptions():
if isinstance(hint["exc_info"][1], Retry):
return None

if hasattr(task, "throws") and isinstance(
hint["exc_info"][1], task.throws
):
return None

with capture_internal_exceptions():
if issubclass(hint["exc_info"][0], SoftTimeLimitExceeded):
event["fingerprint"] = [
Expand All @@ -104,16 +100,40 @@ def event_processor(event, hint):
return event_processor


def _capture_exception():
def _capture_exception(task, exc_info):
hub = Hub.current
exc_info = sys.exc_info()

if hub.get_integration(CeleryIntegration) is not None:
event, hint = event_from_exception(
exc_info,
client_options=hub.client.options,
mechanism={"type": "celery", "handled": False},
)
hub.capture_event(event, hint=hint)
if hub.get_integration(CeleryIntegration) is None:
return
if isinstance(exc_info[1], Retry):
return
if hasattr(task, "throws") and isinstance(exc_info[1], task.throws):
return

event, hint = event_from_exception(
exc_info,
client_options=hub.client.options,
mechanism={"type": "celery", "handled": False},
)

hub.capture_event(event, hint=hint)


def _patch_worker_exit():
# Need to flush queue before worker shutdown because a crashing worker will
# call os._exit
from billiard.pool import Worker

old_workloop = Worker.workloop

def sentry_workloop(*args, **kwargs):
try:
return old_workloop(*args, **kwargs)
finally:
with capture_internal_exceptions():
hub = Hub.current
if hub.get_integration(CeleryIntegration) is not None:
hub.flush()


return exc_info
Worker.workloop = sentry_workloop
35 changes: 35 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,38 @@ def append(event):
return events

return inner


@pytest.fixture
def capture_events_forksafe(monkeypatch):
def inner():
events_r, events_w = os.pipe()
events_r = os.fdopen(events_r, "rb", 0)
events_w = os.fdopen(events_w, "wb", 0)

test_client = sentry_sdk.Hub.current.client
old_capture_event = test_client.transport.capture_event

def append(event):
events_w.write(json.dumps(event).encode("utf-8"))
events_w.write(b"\n")

def flush(timeout=None, callback=None):
events_w.write(b"flush\n")

monkeypatch.setattr(test_client.transport, "capture_event", append)
monkeypatch.setattr(test_client, 'flush', flush)

return EventStreamReader(events_r)

return inner

class EventStreamReader(object):
def __init__(self, file):
self.file = file

def read_event(self):
return json.loads(self.file.readline().decode("utf-8"))

def read_flush(self):
assert self.file.readline() == b'flush\n'
47 changes: 42 additions & 5 deletions tests/integrations/celery/test_celery.py
@@ -22,7 +25,7 @@ def init_celery(sentry_init):
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import threading

import pytest

pytest.importorskip("celery")
Expand All @@ -6,6 +8,7 @@
from sentry_sdk.integrations.celery import CeleryIntegration

from celery import Celery, VERSION
from celery.bin import worker


@pytest.fixture
Expand All
def inner():
sentry_init(integrations=[CeleryIntegration()])
celery = Celery(__name__)
celery.conf.CELERY_ALWAYS_EAGER = True
celery.conf.task_always_eager = True
return celery

return inner
Expand Down Expand Up @@ -92,11 +95,11 @@ def dummy_task(x, y):
stack_lengths.append(len(Hub.current._stack))
return x / y

try:
if VERSION >= (4,):
dummy_task.delay(2, 2)
except ZeroDivisionError:
if VERSION >= (4,):
raise
else:
with pytest.raises(ZeroDivisionError):
dummy_task.delay(2, 2)

assert len(Hub.current._stack) == 1
if VERSION < (4,):
Expand Down Expand Up @@ -139,3 +142,37 @@ def dummy_task(self):

for e in exceptions:
assert e["type"] == "ZeroDivisionError"


def test_transport_shutdown(request, celery, capture_events_forksafe, tmpdir):
events = capture_events_forksafe()

celery.conf.worker_max_tasks_per_child = 1
celery.conf.broker_url = "memory://localhost/"
celery.conf.task_always_eager = False
celery.conf.result_backend = "file://{}".format(tmpdir.mkdir("celery-results"))

runs = []

@celery.task(name="dummy_task", bind=True)
def dummy_task(self):
runs.append(1)
67E6 1 / 0

res = dummy_task.delay()

w = worker.worker(app=celery)
t = threading.Thread(target=w.run)
t.daemon = True
t.start()

with pytest.raises(ZeroDivisionError):
res.wait()

assert not runs

event = events.read_event()
exception, = event['exception']['values']
assert exception['type'] == 'ZeroDivisionError'

events.read_flush()
22 changes: 5 additions & 17 deletions tests/integrations/rq/test_rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,20 @@ def test_basic(sentry_init, capture_events):
}


def test_transport_shutdown(sentry_init):
def test_transport_shutdown(sentry_init, capture_events_forksafe):
sentry_init(integrations=[RqIntegration()])

events_r, events_w = os.pipe()
events_r = os.fdopen(events_r, "rb", 0)
events_w = os.fdopen(events_w, "wb", 0)

def capture_event(event):
events_w.write(json.dumps(event).encode("utf-8"))
events_w.write(b"\n")

def flush(timeout=None, callback=None):
events_w.write(b"flush\n")

Hub.current.client.transport.capture_event = capture_event
Hub.current.client.flush = flush
events = capture_events_forksafe()

queue = rq.Queue(connection=FakeStrictRedis())
worker = rq.Worker([queue], connection=queue.connection)

queue.enqueue(crashing_job, foo=42)
worker.work(burst=True)

event = events_r.readline()
event = json.loads(event.decode("utf-8"))
event = events.read_event()
events.read_flush()

exception, = event["exception"]["values"]
assert exception["type"] == "ZeroDivisionError"

assert events_r.readline() == b"flush\n"
0