diff --git a/Include/internal/pycore_interp.h b/Include/internal/pycore_interp.h index 6f00eca8de05af..7fdfc7903477de 100644 --- a/Include/internal/pycore_interp.h +++ b/Include/internal/pycore_interp.h @@ -227,6 +227,13 @@ struct _is { PyMutex weakref_locks[NUM_WEAKREF_LIST_LOCKS]; _PyIndexPool tlbc_indices; #endif + // Per-interpreter list of tasks, any lingering tasks from thread + // states gets added here and removed from the corresponding + // thread state's list. + struct llist_node asyncio_tasks_head; + // `asyncio_tasks_lock` is used when tasks are moved + // from thread's list to interpreter's list. + PyMutex asyncio_tasks_lock; // Per-interpreter state for the obmalloc allocator. For the main // interpreter and for all interpreters that don't have their diff --git a/Include/internal/pycore_lock.h b/Include/internal/pycore_lock.h index 8bcb23a6ce9f9d..7484b05d7f2446 100644 --- a/Include/internal/pycore_lock.h +++ b/Include/internal/pycore_lock.h @@ -52,7 +52,7 @@ typedef enum _PyLockFlags { // Lock a mutex with an optional timeout and additional options. See // _PyLockFlags for details. -extern PyLockStatus +extern PyAPI_FUNC(PyLockStatus) _PyMutex_LockTimed(PyMutex *m, PyTime_t timeout_ns, _PyLockFlags flags); // Lock a mutex with additional options. See _PyLockFlags for details. diff --git a/Include/internal/pycore_pystate.h b/Include/internal/pycore_pystate.h index ff3b222b157810..9ec59e60f609ab 100644 --- a/Include/internal/pycore_pystate.h +++ b/Include/internal/pycore_pystate.h @@ -182,8 +182,8 @@ extern void _PyEval_StartTheWorldAll(_PyRuntimeState *runtime); // Perform a stop-the-world pause for threads in the specified interpreter. // // NOTE: This is a no-op outside of Py_GIL_DISABLED builds. -extern void _PyEval_StopTheWorld(PyInterpreterState *interp); -extern void _PyEval_StartTheWorld(PyInterpreterState *interp); +extern PyAPI_FUNC(void) _PyEval_StopTheWorld(PyInterpreterState *interp); +extern PyAPI_FUNC(void) _PyEval_StartTheWorld(PyInterpreterState *interp); static inline void diff --git a/Include/internal/pycore_tstate.h b/Include/internal/pycore_tstate.h index 74e1452763e56c..932623f54c4260 100644 --- a/Include/internal/pycore_tstate.h +++ b/Include/internal/pycore_tstate.h @@ -24,9 +24,14 @@ typedef struct _PyThreadStateImpl { PyObject *asyncio_running_loop; // Strong reference PyObject *asyncio_running_task; // Strong reference + /* Head of circular linked-list of all tasks which are instances of `asyncio.Task` + or subclasses of it used in `asyncio.all_tasks`. + */ + struct llist_node asyncio_tasks_head; struct _qsbr_thread_state *qsbr; // only used by free-threaded build struct llist_node mem_free_queue; // delayed free queue + #ifdef Py_GIL_DISABLED struct _gc_thread_state gc; struct _mimalloc_thread_state mimalloc; diff --git a/Lib/test/test_asyncio/test_free_threading.py b/Lib/test/test_asyncio/test_free_threading.py index 6da398e77e7797..d0221d87062c5b 100644 --- a/Lib/test/test_asyncio/test_free_threading.py +++ b/Lib/test/test_asyncio/test_free_threading.py @@ -3,7 +3,8 @@ import unittest from threading import Thread from unittest import TestCase - +import weakref +from test import support from test.support import threading_helper threading_helper.requires_working_threading(module=True) @@ -95,6 +96,22 @@ def check(): done.set() runner.join() + def test_task_different_thread_finalized(self) -> None: + task = None + async def func(): + nonlocal task + task = asyncio.current_task() + + thread = Thread(target=lambda: asyncio.run(func())) + thread.start() + thread.join() + wr = weakref.ref(task) + del thread + del task + # task finalization in different thread shouldn't crash + support.gc_collect() + self.assertIsNone(wr()) + def test_run_coroutine_threadsafe(self) -> None: results = [] diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index b488fd92aa6817..832215348dad3f 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -67,6 +67,10 @@ typedef struct TaskObj { PyObject *task_name; PyObject *task_context; struct llist_node task_node; +#ifdef Py_GIL_DISABLED + // thread id of the thread where this task was created + uintptr_t task_tid; +#endif } TaskObj; typedef struct { @@ -94,14 +98,6 @@ typedef struct { || PyObject_TypeCheck(obj, state->FutureType) \ || PyObject_TypeCheck(obj, state->TaskType)) -#ifdef Py_GIL_DISABLED -# define ASYNCIO_STATE_LOCK(state) Py_BEGIN_CRITICAL_SECTION_MUT(&state->mutex) -# define ASYNCIO_STATE_UNLOCK(state) Py_END_CRITICAL_SECTION() -#else -# define ASYNCIO_STATE_LOCK(state) ((void)state) -# define ASYNCIO_STATE_UNLOCK(state) ((void)state) -#endif - typedef struct _Py_AsyncioModuleDebugOffsets { struct _asyncio_task_object { uint64_t size; @@ -135,9 +131,6 @@ GENERATE_DEBUG_SECTION(AsyncioDebug, Py_AsyncioModuleDebugOffsets AsyncioDebug) /* State of the _asyncio module */ typedef struct { -#ifdef Py_GIL_DISABLED - PyMutex mutex; -#endif PyTypeObject *FutureIterType; PyTypeObject *TaskStepMethWrapper_Type; PyTypeObject *FutureType; @@ -184,11 +177,6 @@ typedef struct { /* Counter for autogenerated Task names */ uint64_t task_name_counter; - /* Head of circular linked-list of all tasks which are instances of `asyncio.Task` - or subclasses of it. Third party tasks implementations which don't inherit from - `asyncio.Task` are tracked separately using the `non_asyncio_tasks` WeakSet. - */ - struct llist_node asyncio_tasks_head; } asyncio_state; static inline asyncio_state * @@ -2181,16 +2169,15 @@ static PyMethodDef TaskWakeupDef = { static void register_task(asyncio_state *state, TaskObj *task) { - ASYNCIO_STATE_LOCK(state); assert(Task_Check(state, task)); if (task->task_node.next != NULL) { // already registered assert(task->task_node.prev != NULL); - goto exit; + return; } - llist_insert_tail(&state->asyncio_tasks_head, &task->task_node); -exit: - ASYNCIO_STATE_UNLOCK(state); + _PyThreadStateImpl *tstate = (_PyThreadStateImpl *) _PyThreadState_GET(); + struct llist_node *head = &tstate->asyncio_tasks_head; + llist_insert_tail(head, &task->task_node); } static int @@ -2199,19 +2186,38 @@ register_eager_task(asyncio_state *state, PyObject *task) return PySet_Add(state->eager_tasks, task); } -static void -unregister_task(asyncio_state *state, TaskObj *task) +static inline void +unregister_task_safe(TaskObj *task) { - ASYNCIO_STATE_LOCK(state); - assert(Task_Check(state, task)); if (task->task_node.next == NULL) { // not registered assert(task->task_node.prev == NULL); - goto exit; + return; } llist_remove(&task->task_node); -exit: - ASYNCIO_STATE_UNLOCK(state); +} + +static void +unregister_task(asyncio_state *state, TaskObj *task) +{ + assert(Task_Check(state, task)); +#ifdef Py_GIL_DISABLED + // check if we are in the same thread + // if so, we can avoid locking + if (task->task_tid == _Py_ThreadId()) { + unregister_task_safe(task); + } + else { + // we are in a different thread + // stop the world then check and remove the task + PyThreadState *tstate = _PyThreadState_GET(); + _PyEval_StopTheWorld(tstate->interp); + unregister_task_safe(task); + _PyEval_StartTheWorld(tstate->interp); + } +#else + unregister_task_safe(task); +#endif } static int @@ -2425,6 +2431,9 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop, } Py_CLEAR(self->task_fut_waiter); +#ifdef Py_GIL_DISABLED + self->task_tid = _Py_ThreadId(); +#endif self->task_must_cancel = 0; self->task_log_destroy_pending = 1; self->task_num_cancels_requested = 0; @@ -3968,6 +3977,7 @@ _asyncio_current_task_impl(PyObject *module, PyObject *loop) static inline int add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *loop) { + assert(PySet_CheckExact(tasks)); PyObject *done = PyObject_CallMethodNoArgs(task, &_Py_ID(done)); if (done == NULL) { return -1; @@ -3990,6 +4000,57 @@ add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *lo return 0; } +static inline int +add_tasks_llist(struct llist_node *head, PyListObject *tasks) +{ + struct llist_node *node; + llist_for_each_safe(node, head) { + TaskObj *task = llist_data(node, TaskObj, task_node); + // The linked list holds borrowed references to task + // as such it is possible that the task is concurrently + // deallocated while added to this list. + // To protect against concurrent deallocations, + // we first try to incref the task which would fail + // if it is concurrently getting deallocated in another thread, + // otherwise it gets added to the list. + if (_Py_TryIncref((PyObject *)task)) { + if (_PyList_AppendTakeRef(tasks, (PyObject *)task) < 0) { + // do not call any escaping calls here while the world is stopped. + return -1; + } + } + } + return 0; +} + +static inline int +add_tasks_interp(PyInterpreterState *interp, PyListObject *tasks) +{ +#ifdef Py_GIL_DISABLED + assert(interp->stoptheworld.world_stopped); +#endif + // Start traversing from interpreter's linked list + struct llist_node *head = &interp->asyncio_tasks_head; + + if (add_tasks_llist(head, tasks) < 0) { + return -1; + } + + int ret = 0; + // traverse the task lists of thread states + _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { + _PyThreadStateImpl *ts = (_PyThreadStateImpl *)p; + head = &ts->asyncio_tasks_head; + if (add_tasks_llist(head, tasks) < 0) { + ret = -1; + goto exit; + } + } +exit: + _Py_FOR_EACH_TSTATE_END(interp); + return ret; +} + /*********************** Module **************************/ /*[clinic input] @@ -4028,30 +4089,29 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop) Py_DECREF(loop); return NULL; } - int err = 0; - ASYNCIO_STATE_LOCK(state); - struct llist_node *node; - - llist_for_each_safe(node, &state->asyncio_tasks_head) { - TaskObj *task = llist_data(node, TaskObj, task_node); - // The linked list holds borrowed references to task - // as such it is possible that the task is concurrently - // deallocated while added to this list. - // To protect against concurrent deallocations, - // we first try to incref the task which would fail - // if it is concurrently getting deallocated in another thread, - // otherwise it gets added to the list. - if (_Py_TryIncref((PyObject *)task)) { - if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) { - Py_DECREF(tasks); - Py_DECREF(loop); - err = 1; - break; - } - } - } - ASYNCIO_STATE_UNLOCK(state); - if (err) { + PyInterpreterState *interp = PyInterpreterState_Get(); + // Stop the world and traverse the per-thread linked list + // of asyncio tasks for every thread, as well as the + // interpreter's linked list, and add them to `tasks`. + // The interpreter linked list is used for any lingering tasks + // whose thread state has been deallocated while the task was + // still alive. This can happen if a task is referenced by + // a different thread, in which case the task is moved to + // the interpreter's linked list from the thread's linked + // list before deallocation. See PyThreadState_Clear. + // + // The stop-the-world pause is required so that no thread + // modifies its linked list while being iterated here + // in parallel. This design allows for lock-free + // register_task/unregister_task for loops running in parallel + // in different threads (the general case). + _PyEval_StopTheWorld(interp); + int ret = add_tasks_interp(interp, (PyListObject *)tasks); + _PyEval_StartTheWorld(interp); + if (ret < 0) { + // call any escaping calls after starting the world to avoid any deadlocks. + Py_DECREF(tasks); + Py_DECREF(loop); return NULL; } PyObject *scheduled_iter = PyObject_GetIter(state->non_asyncio_tasks); @@ -4323,7 +4383,6 @@ module_exec(PyObject *mod) { asyncio_state *state = get_asyncio_state(mod); - llist_init(&state->asyncio_tasks_head); #define CREATE_TYPE(m, tp, spec, base) \ do { \ diff --git a/Python/pystate.c b/Python/pystate.c index e6770ef40df740..89a652850e9363 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -643,6 +643,8 @@ init_interpreter(PyInterpreterState *interp, _Py_brc_init_state(interp); #endif llist_init(&interp->mem_free_queue.head); + llist_init(&interp->asyncio_tasks_head); + interp->asyncio_tasks_lock = (PyMutex){0}; for (int i = 0; i < _PY_MONITORING_UNGROUPED_EVENTS; i++) { interp->monitors.tools[i] = 0; } @@ -1512,7 +1514,7 @@ init_threadstate(_PyThreadStateImpl *_tstate, tstate->delete_later = NULL; llist_init(&_tstate->mem_free_queue); - + llist_init(&_tstate->asyncio_tasks_head); if (interp->stoptheworld.requested || _PyRuntime.stoptheworld.requested) { // Start in the suspended state if there is an ongoing stop-the-world. tstate->state = _Py_THREAD_SUSPENDED; @@ -1692,6 +1694,14 @@ PyThreadState_Clear(PyThreadState *tstate) Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_loop); Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_task); + + PyMutex_Lock(&tstate->interp->asyncio_tasks_lock); + // merge any lingering tasks from thread state to interpreter's + // tasks list + llist_concat(&tstate->interp->asyncio_tasks_head, + &((_PyThreadStateImpl *)tstate)->asyncio_tasks_head); + PyMutex_Unlock(&tstate->interp->asyncio_tasks_lock); + Py_CLEAR(tstate->dict); Py_CLEAR(tstate->async_exc);