8000 bpo-46994: Accept explicit contextvars.Context in asyncio create_task() API by asvetlov · Pull Request #31837 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

bpo-46994: Accept explicit contextvars.Context in asyncio create_task() API #31837

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
bpo-46994: Accept explicit contextvars.Context in asyncio create_task…
…() API
  • Loading branch information
asvetlov committed Mar 12, 2022
commit 50e6b0123e8686c4bac077d577d48d174b8b6849
11 changes: 9 additions & 2 deletions Doc/library/asyncio-eventloop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ Creating Futures and Tasks

.. versionadded:: 3.5.2

.. method:: loop.create_task(coro, *, name=None)
.. method:: loop.create_task(coro, *, name=None, context=None)

Schedule the execution of a :ref:`coroutine`.
Return a :class:`Task` object.
Expand All @@ -342,17 +342,24 @@ Creating Futures and Tasks
If the *name* argument is provided and not ``None``, it is set as
the name of the task using :meth:`Task.set_name`.

An optional keyword-only *context* argument allows specifying a
custom :class:`contextvars.Context` for the *coro* to run in.
The current context copy is created when no *context* is provided.

.. versionchanged:: 3.8
Added the *name* parameter.

.. versionchanged:: 3.11
Added the *context* parameter.

.. method:: loop.set_task_factory(factory)

Set a task factory that will be used by
:meth:`loop.create_task`.

If *factory* is ``None`` the default task factory will be set.
Otherwise, *factory* must be a *callable* with the signature matching
``(loop, coro)``, where *loop* is a reference to the active
``(loop, coro, context=None)``, where *loop* is a reference to the active
event loop, and *coro* is a coroutine object. The callable
must return a :class:`asyncio.Future`-compatible object.

Expand Down
9 changes: 8 additions & 1 deletion Doc/library/asyncio-task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,18 @@ Running an asyncio Program
Creating Tasks
==============

.. function:: create_task(coro, *, name=None)
.. function:: create_task(coro, *, name=None, context=None)

Wrap the *coro* :ref:`coroutine <coroutine>` into a :class:`Task`
and schedule its execution. Return the Task object.

If *name* is not ``None``, it is set as the name of the task using
:meth:`Task.set_name`.

An optional keyword-only *context* argument allows specifying a
custom :class:`contextvars.Context` for the *coro* to run in.
The current context copy is created when no *context* is provided.

The task is executed in the loop returned by :func:`get_running_loop`,
:exc:`RuntimeError` is raised if there is no running loop in
current thread.
Expand Down Expand Up @@ -281,6 +285,9 @@ Creating Tasks
.. versionchanged:: 3.8
Added the *name* parameter.

.. versionchanged:: 3.11
Added the *context* parameter.


Sleeping
========
Expand Down
11 changes: 8 additions & 3 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,18 +426,23 @@ def create_future(self):
"""Create a Future object attached to the loop."""
return futures.Future(loop=self)

def create_task(self, coro, *, name=None):
def create_task(self, coro, *, name=None, context=None):
"""Schedule a coroutine object.

Return a task object.
"""
self._check_closed()
if self._task_factory is None:
task = tasks.Task(coro, loop=self, name=name)
task = tasks.Task(coro, loop=self, name=name, context=context)
if task._source_traceback:
del task._source_traceback[-1]
else:
task = self._task_factory(self, coro)
if context is None:
# Use legacy API if context is not needed
task = self._task_factory(self, coro)
else:
task = self._task_factory(self, coro, context=context)

tasks._set_task_name(task, name)

return task
Expand Down
2 changes: 1 addition & 1 deletion Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def create_future(self):

# Method scheduling a coroutine object: create a task.

def create_task(self, coro, *, name=None):
def create_task(self, coro, *, name=None, context=None):
raise NotImplementedError

# Methods for interacting with threads.
Expand Down
16 changes: 12 additions & 4 deletions Lib/asyncio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
# status is still pending
_log_destroy_pending = True

def __init__(self, coro, *, loop=None, name=None):
def __init__(self, coro, *, loop=None, name=None, context=None):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
Expand All @@ -109,7 +109,10 @@ def __init__(self, coro, *, loop=None, name=None):
self._must_cancel = False
self._fut_waiter = None
self._coro = coro
self._context = contextvars.copy_context()
if context is None:
self._context = contextvars.copy_context()
else:
self._context = context

self._loop.call_soon(self.__step, context=self._context)
_register_task(self)
Expand Down Expand Up @@ -357,13 +360,18 @@ def __wakeup(self, future):
Task = _CTask = _asyncio.Task


def create_task(coro, *, name=None):
def create_task(coro, *, name=None, context=None):
"""Schedule the execution of a coroutine object in a spawn task.

Return a Task object.
"""
loop = events.get_running_loop()
task = loop.create_task(coro)
if context is None:
# Use legacy API if context is not needed
task = loop.create_task(coro)
else:
task = loop.create_task(coro, context=context)

_set_task_name(task, name)
return task

Expand Down
55 changes: 17 additions & 38 deletions Lib/unittest/async_case.py
F438
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import contextvars
import inspect
import warnings

Expand Down Expand Up @@ -34,7 +35,7 @@ class IsolatedAsyncioTestCase(TestCase):
def __init__(self, methodName='runTest'):
super().__init__(methodName)
self._asyncioTestLoop = None
self._asyncioCallsQueue = None
self._asyncioTestContext = contextvars.copy_context()

async def asyncSetUp(self):
pass
Expand All @@ -58,7 +59,7 @@ def addAsyncCleanup(self, func, /, *args, **kwargs):
self.addCleanup(*(func, *args), **kwargs)

def _callSetUp(self):
self.setUp()
self._asyncioTestContext.run(self.setUp)
self._callAsync(self.asyncSetUp)

def _callTestMethod(self, method):
Expand All @@ -68,64 +69,42 @@ def _callTestMethod(self, method):

def _callTearDown(self):
self._callAsync(self.asyncTearDown)
self.tearDown()
self._asyncioTestContext.run(self.tearDown)

def _callCleanup(self, function, *args, **kwargs):
self._callMaybeAsync(function, *args, **kwargs)

def _callAsync(self, func, /, *args, **kwargs):
assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
ret = func(*args, **kwargs)
assert inspect.isawaitable(ret), f'{func!r} returned non-awaitable'
fut = self._asyncioTestLoop.create_future()
self._asyncioCallsQueue.put_nowait((fut, ret))
return self._asyncioTestLoop.run_until_complete(fut)
assert inspect.iscoroutinefunction(func), f'{func!r} is not an async function'
task = self._asyncioTestLoop.create_task(
func(*args, **kwargs),
context=self._asyncioTestContext,
)
return self._asyncioTestLoop.run_until_complete(task)

def _callMaybeAsync(self, func, /, *args, **kwargs):
assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
ret = func(*args, **kwargs)
if inspect.isawaitable(ret):
fut = self._asyncioTestLoop.create_future()
self._asyncioCallsQueue.put_nowait((fut, ret))
return self._asyncioTestLoop.run_until_complete(fut)
if inspect.iscoroutinefunction(func):
task = self._asyncioTestLoop.create_task(
func(*args, **kwargs),
context=self._asyncioTestContext,
)
return self._asyncioTestLoop.run_until_complete(task)
else:
return ret

async def _asyncioLoopRunner(self, fut):
self._asyncioCallsQueue = queue = asyncio.Queue()
fut.set_result(None)
while True:
query = await queue.get()
queue.task_done()
if query is None:
return
fut, awaitable = query
try:
ret = await awaitable
if not fut.cancelled():
fut.set_result(ret)
except (SystemExit, KeyboardInterrupt):
raise
except (BaseException, asyncio.CancelledError) as ex:
if not fut.cancelled():
fut.set_exception(ex)
return self._asyncioTestContext.run(func, *args, **kwargs)

def _setupAsyncioLoop(self):
assert self._asyncioTestLoop is None, 'asyncio test loop already initialized'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(True)
self._asyncioTestLoop = loop
fut = loop.create_future()
self._asyncioCallsTask = loop.create_task(self._asyncioLoopRunner(fut))
loop.run_until_complete(fut)

def _tearDownAsyncioLoop(self):
assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
loop = self._asyncioTestLoop
self._asyncioTestLoop = None
self._asyncioCallsQueue.put_nowait(None)
loop.run_until_complete(self._asyncioCallsQueue.join())

try:
# cancel all tasks
Expand Down
18 changes: 18 additions & 0 deletions Lib/unittest/test/test_async_case.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import contextvars
import unittest
from test import support

Expand All @@ -11,6 +12,9 @@ def tearDownModule():
asyncio.set_event_loop_policy(None)


VAR = contextvars.ContextVar('VAR', default=())


class TestAsyncCase(unittest.TestCase):
maxDiff = None

Expand All @@ -24,22 +28,26 @@ class Test(unittest.IsolatedAsyncioTestCase):
def setUp(self):
self.assertEqual(events, [])
events.append('setUp')
VAR.set(VAR.get() + ('setUp',))

async def asyncSetUp(self):
self.assertEqual(events, ['setUp'])
events.append('asyncSetUp')
VAR.set(VAR.get() + ('asyncSetUp',))
self.addAsyncCleanup(self.on_cleanup1)

async def test_func(self):
self.assertEqual(events, ['setUp',
'asyncSetUp'])
events.append('test')
VAR.set(VAR.get() + ('test',))
self.addAsyncCleanup(self.on_cleanup2)

async def asyncTearDown(self):
self.assertEqual(events, ['setUp',
'asyncSetUp',
'test'])
VAR.set(VAR.get() + ('asyncTearDown',))
events.append('asyncTearDown')
10000
def tearDown(self):
Expand All @@ -48,6 +56,7 @@ def tearDown(self):
'test',
'asyncTearDown'])
events.append('tearDown')
VAR.set(VAR.get() + ('tearDown',))

async def on_cleanup1(self):
self.assertEqual(events, ['setUp',
Expand All @@ -57,6 +66,9 @@ async def on_cleanup1(self):
'tearDown',
'cleanup2'])
events.append('cleanup1')
VAR.set(VAR.get() + ('cleanup1',))
nonlocal cvar
cvar = VAR.get()

async def on_cleanup2(self):
self.assertEqual(events, ['setUp',
Expand All @@ -65,22 +77,28 @@ async def on_cleanup2(self):
'asyncTearDown',
'tearDown'])
events.append('cleanup2')
VAR.set(VAR.get() + ('cleanup2',))

events = []
cvar = ()
test = Test("test_func")
result = test.run()
self.assertEqual(result.errors, [])
self.assertEqual(result.failures, [])
expected = ['setUp', 'asyncSetUp', 'test',
'asyncTearDown', 'tearDown', 'cleanup2', 'cleanup1']
self.assertEqual(events, expected)
self.assertEqual(cvar, tuple(expected))

events = []
cvar = ()
test = Test("test_func")
test.debug()
self.assertEqual(events, expected)
self.assertEqual(cvar, tuple(expected))
test.doCleanups()
self.assertEqual(events, expected)
self.assertEqual(cvar, tuple(expected))

def test_exception_in_setup(self):
class Test(unittest.IsolatedAsyncioTestCase):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Accept explicit contextvars.Context in :func:`asyncio.create_task` and
:meth:`asyncio.loop.create_task`.
15 changes: 10 additions & 5 deletions Modules/_asynciomodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -2003,14 +2003,15 @@ _asyncio.Task.__init__
*
loop: object = None
name: object = None
context: object = None

A coroutine wrapped in a Future.
[clinic start generated code]*/

static int
_asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
PyObject *name)
/*[clinic end generated code: output=88b12b83d570df50 input=352a3137fe60091d]*/
PyObject *name, PyObject *context)
/*[clinic end generated code: output=49ac96fe33d0e5c7 input=924522490c8ce825]*/
{
if (future_init((FutureObj*)self, loop)) {
return -1;
Expand All @@ -2028,9 +2029,13 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
return -1;
}

Py_XSETREF(self->task_context, PyContext_CopyCurrent());
if (self->task_context == NULL) {
return -1;
if (context != NULL) {
self->task_context = Py_NewRef(context);
} else {
Py_XSETREF(self->task_context, PyContext_CopyCurrent());
if (self->task_context == NULL) {
return -1;
}
}

Py_CLEAR(self->task_fut_waiter);
Expand Down
Loading
0