8000 gh-128002: use per threads tasks linked list in asyncio by kumaraditya303 · Pull Request #128869 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

gh-128002: use per threads tasks linked list in asyncio #128869

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8000
Next Next commit
switch to per thread tasks list
  • Loading branch information
kumaraditya303 committed Jan 15, 2025
commit 1a39d4161bacb8a4f9a88c3b9f1f83d8e77e4e30
2 changes: 2 additions & 0 deletions Include/internal/pycore_tstate.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ typedef struct _PyThreadStateImpl {

PyObject *asyncio_running_loop; // Strong reference

struct llist_node asyncio_tasks_head;
struct _qsbr_thread_state *qsbr; // only used by free-threaded build
struct llist_node mem_free_queue; // delayed free queue


#ifdef Py_GIL_DISABLED
struct _gc_thread_state gc;
struct _mimalloc_thread_state mimalloc;
Expand Down
38 changes: 8 additions & 30 deletions Modules/_asynciomodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,8 @@ typedef struct {
#define Future_Check(state, obj) PyObject_TypeCheck(obj, state->FutureType)
#define Task_Check(state, obj) PyObject_TypeCheck(obj, state->TaskType)

#ifdef Py_GIL_DISABLED
# define ASYNCIO_STATE_LOCK(state) Py_BEGIN_CRITICAL_SECTION_MUT(&state->mutex)
# define ASYNCIO_STATE_UNLOCK(state) Py_END_CRITICAL_SECTION()
#else
# define ASYNCIO_STATE_LOCK(state) ((void)state)
# define ASYNCIO_STATE_UNLOCK(state) ((void)state)
#endif

/* State of the _asyncio module */
typedef struct {
#ifdef Py_GIL_DISABLED
PyMutex mutex;
#endif
PyTypeObject *FutureIterType;
PyTypeObject *TaskStepMethWrapper_Type;
PyTypeObject *FutureType;
Expand Down Expand Up @@ -135,11 +124,6 @@ typedef struct {
/* Counter for autogenerated Task names */
uint64_t task_name_counter;

/* Head of circular linked-list of all tasks which are instances of `asyncio.Task`
or subclasses of it. Third party tasks implementations which don't inherit from
`asyncio.Task` are tracked separately using the `non_asyncio_tasks` WeakSet.
*/
struct llist_node asyncio_tasks_head;
} asyncio_state;

static inline asyncio_state *
Expand Down Expand Up @@ -1997,16 +1981,15 @@ static PyMethodDef TaskWakeupDef = {
static void
register_task(asyncio_state *state, TaskObj *task)
{
ASYNCIO_STATE_LOCK(state);
assert(Task_Check(state, task));
if (task->task_node.next != NULL) {
// already registered
assert(task->task_node.prev != NULL);
goto exit;
return;
}
llist_insert_tail(&state->asyncio_tasks_head, &task->task_node);
exit:
ASYNCIO_STATE_UNLOCK(state);
_PyThreadStateImpl *tstate = (_PyThreadStateImpl *) _PyThreadState_GET();
struct llist_node *head = &tstate->asyncio_tasks_head;
llist_insert_tail(head, &task->task_node);
}

static int
Expand All @@ -2018,16 +2001,13 @@ register_eager_task(asyncio_state *state, PyObject *task)
static void
unregister_task(asyncio_state *state, TaskObj *task)
{
ASYNCIO_STATE_LOCK(state);
assert(Task_Check(state, task));
if (task->task_node.next == NULL) {
// not registered
assert(task->task_node.prev == NULL);
goto exit;
return;
}
llist_remove(&task->task_node);
exit:
ASYNCIO_STATE_UNLOCK(state);
}

static int
Expand Down Expand Up @@ -3767,10 +3747,10 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop)
return NULL;
}
int err = 0;
ASYNCIO_STATE_LOCK(state);
struct llist_node *node;

llist_for_each_safe(node, &state->asyncio_tasks_head) {
_PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
struct llist_node *head = &ts->asyncio_tasks_head;
llist_for_each_safe(node, head) {
TaskObj *task = llist_data(node, TaskObj, task_node);
// The linked list holds borrowed references to task
// as such it is possible that the task is concurrently
Expand All @@ -3788,7 +3768,6 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop)
}
}
}
ASYNCIO_STATE_UNLOCK(state);
if (err) {
return NULL;
}
Expand Down Expand Up @@ -4015,7 +3994,6 @@ module_exec(PyObject *mod)
{
asyncio_state *state = get_asyncio_state(mod);

llist_init(&state->asyncio_tasks_head);

#define CREATE_TYPE(m, tp, spec, base) \
do { \
Expand Down
7 changes: 6 additions & 1 deletion Python/pystate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1519,7 +1519,7 @@ init_threadstate(_PyThreadStateImpl *_tstate,
tstate->delete_later = NULL;

llist_init(&_tstate->mem_free_queue);

llist_init(&_tstate->asyncio_tasks_head);
if (interp->stoptheworld.requested || _PyRuntime.stoptheworld.requested) {
// Start in the suspended state if there is an ongoing stop-the-world.
tstate->state = _Py_THREAD_SUSPENDED;
Expand Down Expand Up @@ -1698,6 +1698,11 @@ PyThreadState_Clear(PyThreadState *tstate)

Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_loop);

struct llist_node *node;
llist_for_each_safe(node, &((_PyThreadStateImpl *)tstate)->asyncio_tasks_head) {
llist_remove(node);
}

Py_CLEAR(tstate->dict);
Py_CLEAR(tstate->async_exc);

Expand Down
0