8000 Make it possible to define additional context on the worker · Develop-Python/procrastinate@a4861c4 · GitHub
[go: up one dir, main page]

Skip to content

Commit a4861c4

Browse files
author
Éric Lemoine
committed
Make it possible to define additional context on the worker
1 parent a926139 commit a4861c4

File tree

3 files changed

+54
-10
lines changed

3 files changed

+54
-10
lines changed

procrastinate/app.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,9 @@ async def run_worker_async(self, **kwargs) -> None:
311311
If ``successful`` the worker will only delete successful jobs.
312312
If ``never``, the worker will keep the jobs in the database.
313313
(defaults to ``never``)
314+
additional_context: ``Optional[Dict[str, Any]]``
315+
If set extend the context received by the tasks when ``pass_context`` is set
316+
to ``True`` in the task definition.
314317
"""
315318
self.perform_import_paths()
316319
worker = self._worker(**kwargs)

procrastinate/worker.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44
import time
55
from enum import Enum
6-
from typing import Dict, Iterable, Optional, Set, Union
6+
from typing import Any, Dict, Iterable, Optional, Set, Union
77

88
from procrastinate import app, exceptions, job_context, jobs, signals, tasks, utils
99

@@ -36,6 +36,7 @@ def __init__(
3636
timeout: float = WORKER_TIMEOUT,
3737
listen_notify: bool = True,
3838
delete_jobs: str = DeleteJobCondition.NEVER.value,
39+
additional_context: Optional[Dict[str, Any]] = None,
3940
):
4041
self.app = app
4142
self.queues = queues
@@ -57,7 +58,10 @@ def __init__(
5758
self.logger = logger
5859

5960
self.base_context: job_context.JobContext = job_context.JobContext(
60-
app=app, worker_name=self.worker_name, worker_queues=self.queues
61+
app=app,
62+
worker_name=self.worker_name,
63+
worker_queues=self.queues,
64+
additional_context=additional_context.copy() if additional_context else {},
6165
)
6266
self.current_contexts: Dict[int, job_context.JobContext] = {}
6367
self.stop_requested = False
@@ -75,6 +79,7 @@ def context_for_worker(
7579
if reset or worker_id not in self.current_contexts:
7680
context = self.base_context
7781
kwargs["worker_id"] = worker_id
82+
kwargs["additional_context"] = self.base_context.additional_context.copy()
7883
else:
7984
context = self.current_contexts[worker_id]
8085

tests/unit/test_worker.py

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ def _(job):
2323
return _
2424

2525

26+
def test_worker_additional_context(app):
27+
worker_obj = worker.Worker(app=app, additional_context={"foo": "bar"})
28+
assert worker_obj.base_context.additional_context == {"foo": "bar"}
29+
30+
2631
async def test_run(test_worker, mocker, caplog):
2732
caplog.set_level("INFO")
2833

@@ -315,17 +320,15 @@ def task_func(test_context, a):
315320
task_name="job",
316321
queue="yay",
317322
)
318-
test_worker = worker.Worker(app, queues=["yay"], name="my_worker")
319-
context = job_context.JobContext(
320-
worker_name="my_worker",
321-
worker_id=3,
322-
worker_queues=["yay"],
323-
job=job,
324-
task=task_func,
323+
test_worker = worker.Worker(
324+
app, queues=["yay"], name="my_worker", additional_context={"foo": "bar"}
325325
)
326-
test_worker.current_contexts[3] = context
326+
context = test_worker.context_for_worker(worker_id=3)
327+
327328
await test_worker.run_job(job=job, worker_id=3)
328329

330+
context = context.evolve(task=task_func)
331+
329332
assert result == [
330333
context,
331334
1,
@@ -519,3 +522,36 @@ def test_context_for_worker_reset(app):
519522
context = test_worker.context_for_worker(worker_id=3, reset=True)
520523

521524
assert context == expected
525+
526+
527+
def test_worker_copy_additional_context(app):
528+
additional_context = {"foo": "bar"}
529+
test_worker = worker.Worker(
530+
app=app,
531+
name="worker",
532+
additional_context=additional_context,
533+
)
534+
535+
# mutate the additional_context object and test that we have the original
536+
# value in the worker
537+
additional_context["foo"] = "baz"
538+
assert test_worker.base_context.additional_context == {"foo": "bar"}
539+
540+
541+
def test_context_for_worker_with_additional_context(app):
542+
additional_context = {"foo": "bar"}
543+
test_worker = worker.Worker(
544+
app=app,
545+
name="worker",
546+
additional_context=additional_context,
547+
)
548+
549+
context1 = test_worker.context_for_worker(worker_id=3)
550+
551+
# mutate the additional_context object for one worker and test that it
552+
# hasn't changed for other workers
553+
context1.additional_context["foo"] = "baz"
554+
555+
context2 = test_worker.context_for_worker(worker_id=4)
556+
557+
assert context2.additional_context == {"foo": "bar"}

0 commit comments

Comments
 (0)
0