8000 fix: Ensure drained queue when RQ kills job (#153) · etherscan-io/sentry-python@2d21404 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2d21404

Browse files
authored
fix: Ensure drained queue when RQ kills job (getsentry#153)
* fix: Ensure drained queue when RQ kills job * fix: callback may be none * ref: Split up testcases * fix: Fix tests * fix: Encoding
1 parent a11f3f3 commit 2d21404

File tree

3 files changed

+70
-32
lines changed

3 files changed

+70
-32
lines changed

sentry_sdk/integrations/rq.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,18 @@ def sentry_patched_perform_job(self, job, *args, **kwargs):
2727

2828
with hub.push_scope() as scope:
2929
scope.add_event_processor(_make_event_processor(weakref.ref(job)))
30-
return old_perform_job(self, job, *args, **kwargs)
30+
rv = old_perform_job(self, job, *args, **kwargs)
31+
32+
if self.is_horse:
33+
# We're inside of a forked process and RQ is
34+
# about to call `os._exit`. Make sure that our
35+
# events get sent out.
36+
#
37+
# Closing the client should not affect other jobs since
38+
# we're in a different process
39+
hub.client.close()
40+
41+
return rv
3142

3243
Worker.perform_job = sentry_patched_perform_job
3344

setup.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,19 @@
2424
install_requires=["urllib3", "certifi"],
2525
extras_require={"flask": ["flask>=0.8", "blinker>=1.1"]},
2626
classifiers=[
27-
'Development Status :: 5 - Production/Stable',
28-
'Environment :: Web Environment',
29-
'Intended Audience :: Developers',
30-
'License :: OSI Approved :: BSD License',
31-
'Operating System :: OS Independent',
32-
'Programming Language :: Python',
33-
'Programming Language :: Python :: 2',
34-
'Programming Language :: Python :: 2.7',
35-
'Programming Language :: Python :: 3',
36-
'Programming Language :: Python :: 3.4',
37-
'Programming Language :: Python :: 3.5',
38-
'Programming Language :: Python :: 3.6',
39-
'Programming Language :: Python :: 3.7',
40-
'Topic :: Software Development :: Libraries :: Python Modules',
27+
"Development Status :: 5 - Production/Stable",
28+
"Environment :: Web Environment",
29+
"Intended Audience :: Developers",
30+
"License :: OSI Approved :: BSD License",
31+
"Operating System :: OS Independent",
32+
"Programming Language :: Python",
33+
"Programming Language :: Python :: 2",
34+
"Programming Language :: Python :: 2.7",
35+
"Programming Language :: Python :: 3",
36+
"Programming Language :: Python :: 3.4",
37+
"Programming Language :: Python :: 3.5",
38+
"Programming Language :: Python :: 3.6",
39+
"Programming Language :: Python :: 3.7",
40+
"Topic :: Software Development :: Libraries :: Python Modules",
4141
],
4242
)

tests/integrations/rq/test_rq.py

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,28 @@
11
from sentry_sdk.integrations.rq import RqIntegration
22

3-
import pytest
3+
import os
4+
import json
45

56
from fakeredis import FakeStrictRedis
6-
from rq import SimpleWorker, Queue
7+
import rq
78

8-
9-
@pytest.fixture
10-
def run_job():
11-
queue = Queue(connection=FakeStrictRedis())
12-
worker = SimpleWorker([queue], connection=queue.connection)
13-
14-
def inner(fn, *a, **kw):
15-
job = queue.enqueue(fn, *a, **kw)
16-
worker.work(burst=True)
17-
return job
18-
19-
return inner
9+
from sentry_sdk import Hub
2010

2111

2212
def crashing_job(foo):
2313
1 / 0
2414

2515

26-
def test_basic(sentry_init, capture_events, run_job):
16+
def test_basic(sentry_init, capture_events):
2717
sentry_init(integrations=[RqIntegration()])
28-
2918
events = capture_events()
30-
run_job(crashing_job, foo=42)
19+
20+
queue = rq.Queue(connection=FakeStrictRedis())
21+
worker = rq.SimpleWorker([queue], connection=queue.connection)
22+
23+
queue.enqueue(crashing_job, foo=42)
24+
worker.work(burst=True)
25+
3126
event, = events
3227

3328
exception, = event["exception"]["values"]
@@ -43,3 +38,35 @@ def test_basic(sentry_init, capture_events, run_job):
4338
"job_id": event["extra"]["rq-job"]["job_id"],
4439
"kwargs": {"foo": 42},
4540
}
41+
42+
43+
def test_transport_shutdown(sentry_init):
44+
sentry_init(integrations=[RqIntegration()])
45+
46+
events_r, events_w = os.pipe()
47+
events_r = os.fdopen(events_r, "rb", 0)
48+
events_w = os.fdopen(events_w, "wb", 0)
49+
50+
def capture_event(event):
51+
events_w.write(json.dumps(event).encode("utf-8"))
52+
events_w.write(b"\n")
53+
54+
def shutdown(timeout, callback=None):
55+
events_w.write(b"shutdown\n")
56+
57+
Hub.current.client.transport.capture_event = capture_event
58+
Hub.current.client.transport.shutdown = shutdown
59+
60+
queue = rq.Queue(connection=FakeStrictRedis())
61+
worker = rq.Worker([queue], connection=queue.connection)
62+
63+
queue.enqueue(crashing_job, foo=42)
64+
worker.work(burst=True)
65+
66+
event = events_r.readline()
67+
shutdown = events_r.readline()
68+
event = json.loads(event.decode("utf-8"))
69+
assert shutdown == b"shutdown\n"
70+
71+
exception, = event["exception"]["values"]
72+
assert exception["type"] == "ZeroDivisionError"

0 commit comments

Comments
 (0)
0