From 6e46450e32cafc2f4e23c6418618bc61f6399033 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 22 May 2023 15:08:16 -0600 Subject: [PATCH 01/29] Clear each pending call when popping it. --- Include/internal/pycore_ceval_state.h | 2 +- Python/ceval_gil.c | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Include/internal/pycore_ceval_state.h b/Include/internal/pycore_ceval_state.h index 95d1fa16ba40dc..d40f0a00b453f9 100644 --- a/Include/internal/pycore_ceval_state.h +++ b/Include/internal/pycore_ceval_state.h @@ -72,7 +72,7 @@ struct _pending_calls { Guarded by the GIL. */ int async_exc; #define NPENDINGCALLS 32 - struct { + struct _pending_call { int (*func)(void *); void *arg; } calls[NPENDINGCALLS]; diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 723cf0f4df94d0..1b60ef68acdc79 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -815,6 +815,7 @@ _pop_pending_call(struct _pending_calls *pending, *func = pending->calls[i].func; *arg = pending->calls[i].arg; + pending->calls[i] = (struct _pending_call){0}; pending->first = (i + 1) % NPENDINGCALLS; } From 3b18b7d2ecfe2bf397025a77528c0297bfe40356 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 23 May 2023 11:14:05 -0600 Subject: [PATCH 02/29] Call _Py_FinishPendingCalls() in Py_EndInterpreter(). --- Python/pylifecycle.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Python/pylifecycle.c b/Python/pylifecycle.c index 4c21160d312465..46a400a4fc7444 100644 --- a/Python/pylifecycle.c +++ b/Python/pylifecycle.c @@ -2141,6 +2141,9 @@ Py_EndInterpreter(PyThreadState *tstate) // Wrap up existing "threading"-module-created, non-daemon threads. wait_for_thread_shutdown(tstate); + // Make any remaining pending calls. + _Py_FinishPendingCalls(tstate); + _PyAtExit_Call(tstate->interp); if (tstate != interp->threads.head || tstate->next != NULL) { From f8b483f5b8801ba017d8fde9c1dcd3c61715374e Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 23 May 2023 11:21:04 -0600 Subject: [PATCH 03/29] Factor out has_pending_calls(). --- Python/ceval_gil.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 1b60ef68acdc79..4370af30249b68 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -890,6 +890,15 @@ handle_signals(PyThreadState *tstate) return 0; } +static inline int +has_pending_calls(PyInterpreterState *interp) +{ + if (_Py_atomic_load_relaxed_int32(&interp->ceval.pending.calls_to_do)) { + return 1; + } + return 0; +} + static int make_pending_calls(PyInterpreterState *interp) { @@ -1021,7 +1030,7 @@ _Py_HandlePending(PyThreadState *tstate) } /* Pending calls */ - if (_Py_atomic_load_relaxed_int32(&interp_ceval_state->pending.calls_to_do)) { + if (has_pending_calls(tstate->interp)) { if (make_pending_calls(tstate->interp) != 0) { return -1; } From 44df0bc53ca7a0f9be2bc2caa94a46dc90534e22 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 23 May 2023 11:23:22 -0600 Subject: [PATCH 04/29] Skip the calls_to_do check in _Py_FinishPendingCalls(). --- Python/ceval_gil.c | 6 ------ 1 file changed, 6 deletions(-) diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 4370af30249b68..158b5010be5932 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -954,12 +954,6 @@ _Py_FinishPendingCalls(PyThreadState *tstate) assert(PyGILState_Check()); assert(is_tstate_valid(tstate)); - struct _pending_calls *pending = &tstate->interp->ceval.pending; - - if (!_Py_atomic_load_relaxed_int32(&(pending->calls_to_do))) { - return; - } - if (make_pending_calls(tstate->interp) < 0) { PyObject *exc = _PyErr_GetRaisedException(tstate); PyErr_BadInternalCall(); From e2a0281bc43999574d29b24223103896897508f8 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 23 May 2023 11:46:47 -0600 Subject: [PATCH 05/29] Factor out _PyEval_MakePendingCalls(). --- Include/cpython/ceval.h | 2 ++ Modules/_queuemodule.c | 3 ++- Modules/_threadmodule.c | 3 ++- Python/ceval_gil.c | 37 ++++++++++++++++++++++++------------- 4 files changed, 30 insertions(+), 15 deletions(-) diff --git a/Include/cpython/ceval.h b/Include/cpython/ceval.h index 0fbbee10c2edce..a9616bd6a4f518 100644 --- a/Include/cpython/ceval.h +++ b/Include/cpython/ceval.h @@ -22,6 +22,8 @@ PyAPI_FUNC(PyObject *) _PyEval_EvalFrameDefault(PyThreadState *tstate, struct _P PyAPI_FUNC(void) _PyEval_SetSwitchInterval(unsigned long microseconds); PyAPI_FUNC(unsigned long) _PyEval_GetSwitchInterval(void); +PyAPI_FUNC(int) _PyEval_MakePendingCalls(PyThreadState *); + PyAPI_FUNC(Py_ssize_t) PyUnstable_Eval_RequestCodeExtraIndex(freefunc); // Old name -- remove when this API changes: _Py_DEPRECATED_EXTERNALLY(3.12) static inline Py_ssize_t diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c index d36a911a57c02c..db5be842b8a35c 100644 --- a/Modules/_queuemodule.c +++ b/Modules/_queuemodule.c @@ -210,6 +210,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, PyObject *item; PyLockStatus r; PY_TIMEOUT_T microseconds; + PyThreadState *tstate = PyThreadState_Get(); if (block == 0) { /* Non-blocking */ @@ -253,7 +254,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, Py_END_ALLOW_THREADS } - if (r == PY_LOCK_INTR && Py_MakePendingCalls() < 0) { + if (r == PY_LOCK_INTR && _PyEval_MakePendingCalls(tstate) < 0) { return NULL; } if (r == PY_LOCK_FAILURE) { diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index 5d753b4a0ebc5e..6425a58a6bb634 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -81,6 +81,7 @@ lock_dealloc(lockobject *self) static PyLockStatus acquire_timed(PyThread_type_lock lock, _PyTime_t timeout) { + PyThreadState *tstate = _PyThreadState_GET(); _PyTime_t endtime = 0; if (timeout > 0) { endtime = _PyDeadline_Init(timeout); @@ -103,7 +104,7 @@ acquire_timed(PyThread_type_lock lock, _PyTime_t timeout) /* Run signal handlers if we were interrupted. Propagate * exceptions from signal handlers, such as KeyboardInterrupt, by * passing up PY_LOCK_INTR. */ - if (Py_MakePendingCalls() < 0) { + if (_PyEval_MakePendingCalls(tstate) < 0) { return PY_LOCK_INTR; } diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 158b5010be5932..2f0b3f97e64f6b 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -962,21 +962,19 @@ _Py_FinishPendingCalls(PyThreadState *tstate) } } -/* Py_MakePendingCalls() is a simple wrapper for the sake - of backward-compatibility. */ int -Py_MakePendingCalls(void) +_PyEval_MakePendingCalls(PyThreadState *tstate) { - assert(PyGILState_Check()); - - PyThreadState *tstate = _PyThreadState_GET(); - assert(is_tstate_valid(tstate)); - - /* Python signal handler doesn't really queue a callback: it only signals - that a signal was received, see _PyEval_SignalReceived(). */ - int res = handle_signals(tstate); - if (res != 0) { - return res; + int res; + + if (_Py_IsMainThread()) { + /* Python signal handler doesn't really queue a callback: + it only signals that a signal was received, + see _PyEval_SignalReceived(). */ + res = handle_signals(tstate); + if (res != 0) { + return res; + } } res = make_pending_calls(tstate->interp); @@ -987,6 +985,19 @@ Py_MakePendingCalls(void) return 0; } +/* Py_MakePendingCalls() is a simple wrapper for the sake + of backward-compatibility. */ +int +Py_MakePendingCalls(void) +{ + assert(PyGILState_Check()); + + PyThreadState *tstate = _PyThreadState_GET(); + assert(is_tstate_valid(tstate)); + + return _PyEval_MakePendingCalls(tstate); +} + void _PyEval_InitState(PyInterpreterState *interp, PyThread_type_lock pending_lock) { From 61e5d3e24629797ea6f840aa8b926269070089e5 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 23 May 2023 11:48:21 -0600 Subject: [PATCH 06/29] Explicitly restrict Py_MakePendingCalls() to the main thread. --- Python/ceval_gil.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 2f0b3f97e64f6b..1d64bb2cbbada4 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -995,6 +995,10 @@ Py_MakePendingCalls(void) PyThreadState *tstate = _PyThreadState_GET(); assert(is_tstate_valid(tstate)); + /* Only execute pending calls on the main thread. */ + if (!_Py_IsMainThread()) { + return 0; + } return _PyEval_MakePendingCalls(tstate); } From 5af09f2b24c2fb9eee8d2b23e9e42b863ca638e5 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 23 May 2023 13:15:12 -0600 Subject: [PATCH 07/29] Factor out _make_pending_calls(). --- Python/ceval_gil.c | 60 +++++++++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 1d64bb2cbbada4..19ce8232ccad9e 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -900,26 +900,9 @@ has_pending_calls(PyInterpreterState *interp) } static int -make_pending_calls(PyInterpreterState *interp) +_make_pending_calls(struct _pending_calls *pending) { - /* only execute pending calls on main thread */ - if (!_Py_ThreadCanHandlePendingCalls()) { - return 0; - } - - /* don't perform recursive pending calls */ - if (interp->ceval.pending.busy) { - return 0; - } - interp->ceval.pending.busy = 1; - - /* unsignal before starting to call callbacks, so that any callback - added in-between re-signals */ - UNSIGNAL_PENDING_CALLS(interp); - int res = 0; - /* perform a bounded number of calls, in case of recursion */ - struct _pending_calls *pending = &interp->ceval.pending; for (int i=0; iceval.pending; + + /* only execute pending calls on main thread */ + if (!_Py_ThreadCanHandlePendingCalls()) { + return 0; + } + + /* don't perform recursive pending calls */ + if (pending->busy) { + return 0; + } + pending->busy = 1; + + /* unsignal before starting to call callbacks, so that any callback + added in-between re-signals */ + UNSIGNAL_PENDING_CALLS(interp); - interp->ceval.pending.busy = 0; - return res; + int res = _make_pending_calls(pending); + if (res < 0) { + pending->busy = 0; + SIGNAL_PENDING_CALLS(interp); + return -1; + } -error: - interp->ceval.pending.busy = 0; - SIGNAL_PENDING_CALLS(interp); - return res; + pending->busy = 0; + return 0; } void From 72cc24212284de50d76b45e3ca45cab5f954d352 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 23 May 2023 13:27:37 -0600 Subject: [PATCH 08/29] Add the mainthreadonly arg to _PyEval_AddPendingCall(). --- Include/internal/pycore_ceval.h | 3 ++- Modules/signalmodule.c | 6 ++++-- Python/ceval_gil.c | 6 ++++-- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/Include/internal/pycore_ceval.h b/Include/internal/pycore_ceval.h index ca2703781de4b0..9e9b523e7c2222 100644 --- a/Include/internal/pycore_ceval.h +++ b/Include/internal/pycore_ceval.h @@ -27,7 +27,8 @@ PyAPI_FUNC(void) _PyEval_SignalReceived(PyInterpreterState *interp); PyAPI_FUNC(int) _PyEval_AddPendingCall( PyInterpreterState *interp, int (*func)(void *), - void *arg); + void *arg, + int mainthreadonly); PyAPI_FUNC(void) _PyEval_SignalAsyncExc(PyInterpreterState *interp); #ifdef HAVE_FORK extern PyStatus _PyEval_ReInitThreads(PyThreadState *tstate); diff --git a/Modules/signalmodule.c b/Modules/signalmodule.c index 2350236ad46b25..00ea4343735dab 100644 --- a/Modules/signalmodule.c +++ b/Modules/signalmodule.c @@ -314,7 +314,8 @@ trip_signal(int sig_num) still use it for this exceptional case. */ _PyEval_AddPendingCall(interp, report_wakeup_send_error, - (void *)(intptr_t) last_error); + (void *)(intptr_t) last_error, + 1); } } } @@ -333,7 +334,8 @@ trip_signal(int sig_num) still use it for this exceptional case. */ _PyEval_AddPendingCall(interp, report_wakeup_write_error, - (void *)(intptr_t)errno); + (void *)(intptr_t)errno, + 1); } } } diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 19ce8232ccad9e..21242872bc616a 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -826,8 +826,10 @@ _pop_pending_call(struct _pending_calls *pending, int _PyEval_AddPendingCall(PyInterpreterState *interp, - int (*func)(void *), void *arg) + int (*func)(void *), void *arg, + int mainthreadonly) { + assert(!mainthreadonly || _Py_IsMainInterpreter(interp)); struct _pending_calls *pending = &interp->ceval.pending; /* Ensure that _PyEval_InitState() was called and that _PyEval_FiniState() is not called yet. */ @@ -870,7 +872,7 @@ Py_AddPendingCall(int (*func)(void *), void *arg) /* Last resort: use the main interpreter */ interp = _PyInterpreterState_Main(); } - return _PyEval_AddPendingCall(interp, func, arg); + return _PyEval_AddPendingCall(interp, func, arg, 1); } static int From 88f77574b6d8a4cc1daed18eb1db449df3ba5249 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 23 May 2023 13:34:51 -0600 Subject: [PATCH 09/29] Always use the main interpreter for Py_AddPendingCall(). --- Python/ceval_gil.c | 27 ++------------------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 21242872bc616a..2643b64d70145b 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -847,31 +847,8 @@ _PyEval_AddPendingCall(PyInterpreterState *interp, int Py_AddPendingCall(int (*func)(void *), void *arg) { - /* Best-effort to support subinterpreters and calls with the GIL released. - - First attempt _PyThreadState_GET() since it supports subinterpreters. - - If the GIL is released, _PyThreadState_GET() returns NULL . In this - case, use PyGILState_GetThisThreadState() which works even if the GIL - is released. - - Sadly, PyGILState_GetThisThreadState() doesn't support subinterpreters: - see bpo-10915 and bpo-15751. - - Py_AddPendingCall() doesn't require the caller to hold the GIL. */ - PyThreadState *tstate = _PyThreadState_GET(); - if (tstate == NULL) { - tstate = PyGILState_GetThisThreadState(); - } - - PyInterpreterState *interp; - if (tstate != NULL) { - interp = tstate->interp; - } - else { - /* Last resort: use the main interpreter */ - interp = _PyInterpreterState_Main(); - } + /* Legacy users of this API will continue to target the main thread. */ + PyInterpreterState *interp = _PyInterpreterState_Main(); return _PyEval_AddPendingCall(interp, func, arg, 1); } From d24fafb7da4834140c3471823df7f9efb49d31e3 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 23 May 2023 11:10:59 -0600 Subject: [PATCH 10/29] Add _PyRuntime.ceval.pending_mainthread. --- Include/internal/pycore_ceval_state.h | 38 ++++++++++++++------------- Python/pystate.c | 35 +++++++++--------------- 2 files changed, 33 insertions(+), 40 deletions(-) diff --git a/Include/internal/pycore_ceval_state.h b/Include/internal/pycore_ceval_state.h index d40f0a00b453f9..e56e43c6e0c6a7 100644 --- a/Include/internal/pycore_ceval_state.h +++ b/Include/internal/pycore_ceval_state.h @@ -13,6 +13,24 @@ extern "C" { #include "pycore_gil.h" // struct _gil_runtime_state +struct _pending_calls { + int busy; + PyThread_type_lock lock; + /* Request for running pending calls. */ + _Py_atomic_int calls_to_do; + /* Request for looking at the `async_exc` field of the current + thread state. + Guarded by the GIL. */ + int async_exc; +#define NPENDINGCALLS 32 + struct _pending_call { + int (*func)(void *); + void *arg; + } calls[NPENDINGCALLS]; + int first; + int last; +}; + typedef enum { PERF_STATUS_FAILED = -1, // Perf trampoline is in an invalid state PERF_STATUS_NO_INIT = 0, // Perf trampoline is not initialized @@ -49,6 +67,8 @@ struct _ceval_runtime_state { the main thread of the main interpreter can handle signals: see _Py_ThreadCanHandleSignals(). */ _Py_atomic_int signals_pending; + /* Pending calls to be made only on the main thread. */ + struct _pending_calls pending_mainthread; }; #ifdef PY_HAVE_PERF_TRAMPOLINE @@ -62,24 +82,6 @@ struct _ceval_runtime_state { #endif -struct _pending_calls { - int busy; - PyThread_type_lock lock; - /* Request for running pending calls. */ - _Py_atomic_int calls_to_do; - /* Request for looking at the `async_exc` field of the current - thread state. - Guarded by the GIL. */ - int async_exc; -#define NPENDINGCALLS 32 - struct _pending_call { - int (*func)(void *); - void *arg; - } calls[NPENDINGCALLS]; - int first; - int last; -}; - struct _ceval_state { /* This single variable consolidates all requests to break out of the fast path in the eval loop. */ diff --git a/Python/pystate.c b/Python/pystate.c index 5b7a6c86ade4d7..67493fc89bc0d1 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -380,7 +380,16 @@ _Py_COMP_DIAG_IGNORE_DEPR_DECLS static const _PyRuntimeState initial = _PyRuntimeState_INIT(_PyRuntime); _Py_COMP_DIAG_POP -#define NUMLOCKS 5 +#define NUMLOCKS 6 +#define LOCKPTRS(runtime) \ + { \ + &(runtime)->interpreters.mutex, \ + &(runtime)->xidregistry.mutex, \ + &(runtime)->getargs.mutex, \ + &(runtime)->unicode_state.ids.lock, \ + &(runtime)->imports.extensions.mutex, \ + &(runtime)->ceval.pending_mainthread.lock, \ + } static int alloc_for_runtime(PyThread_type_lock locks[NUMLOCKS]) @@ -427,13 +436,7 @@ init_runtime(_PyRuntimeState *runtime, PyPreConfig_InitPythonConfig(&runtime->preconfig); - PyThread_type_lock *lockptrs[NUMLOCKS] = { - &runtime->interpreters.mutex, - &runtime->xidregistry.mutex, - &runtime->getargs.mutex, - &runtime->unicode_state.ids.lock, - &runtime->imports.extensions.mutex, - }; + PyThread_type_lock *lockptrs[NUMLOCKS] = LOCKPTRS(runtime); for (int i = 0; i < NUMLOCKS; i++) { assert(locks[i] != NULL); *lockptrs[i] = locks[i]; @@ -512,13 +515,7 @@ _PyRuntimeState_Fini(_PyRuntimeState *runtime) LOCK = NULL; \ } - PyThread_type_lock *lockptrs[NUMLOCKS] = { - &runtime->interpreters.mutex, - &runtime->xidregistry.mutex, - &runtime->getargs.mutex, - &runtime->unicode_state.ids.lock, - &runtime->imports.extensions.mutex, - }; + PyThread_type_lock *lockptrs[NUMLOCKS] = LOCKPTRS(runtime); for (int i = 0; i < NUMLOCKS; i++) { FREE_LOCK(*lockptrs[i]); } @@ -541,13 +538,7 @@ _PyRuntimeState_ReInitThreads(_PyRuntimeState *runtime) PyMemAllocatorEx old_alloc; _PyMem_SetDefaultAllocator(PYMEM_DOMAIN_RAW, &old_alloc); - PyThread_type_lock *lockptrs[NUMLOCKS] = { - &runtime->interpreters.mutex, - &runtime->xidregistry.mutex, - &runtime->getargs.mutex, - &runtime->unicode_state.ids.lock, - &runtime->imports.extensions.mutex, - }; + PyThread_type_lock *lockptrs[NUMLOCKS] = LOCKPTRS(runtime); int reinit_err = 0; for (int i = 0; i < NUMLOCKS; i++) { reinit_err += _PyThread_at_fork_reinit(lockptrs[i]); From 1701fa31420dd2fd330d6c8e9161c260fecc2ba4 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 23 May 2023 13:43:54 -0600 Subject: [PATCH 11/29] Run per-interpreter pending calls in any thread. --- Python/ceval_gil.c | 47 +++++++++++++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 2643b64d70145b..49038d3d177a86 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -68,8 +68,9 @@ COMPUTE_EVAL_BREAKER(PyInterpreterState *interp, _Py_atomic_load_relaxed_int32(&ceval2->gil_drop_request) | (_Py_atomic_load_relaxed_int32(&ceval->signals_pending) && _Py_ThreadCanHandleSignals(interp)) - | (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do) - && _Py_ThreadCanHandlePendingCalls()) + | (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do)) + | (_Py_IsMainThread() + &&_Py_atomic_load_relaxed_int32(&ceval->pending_mainthread.calls_to_do)) | ceval2->pending.async_exc | _Py_atomic_load_relaxed_int32(&ceval2->gc_scheduled)); } @@ -95,11 +96,11 @@ RESET_GIL_DROP_REQUEST(PyInterpreterState *interp) static inline void -SIGNAL_PENDING_CALLS(PyInterpreterState *interp) +SIGNAL_PENDING_CALLS(struct _pending_calls *pending, PyInterpreterState *interp) { struct _ceval_runtime_state *ceval = &interp->runtime->ceval; struct _ceval_state *ceval2 = &interp->ceval; - _Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 1); + _Py_atomic_store_relaxed(&pending->calls_to_do, 1); COMPUTE_EVAL_BREAKER(interp, ceval, ceval2); } @@ -109,6 +110,9 @@ UNSIGNAL_PENDING_CALLS(PyInterpreterState *interp) { struct _ceval_runtime_state *ceval = &interp->runtime->ceval; struct _ceval_state *ceval2 = &interp->ceval; + if (_Py_IsMainInterpreter(interp)) { + _Py_atomic_store_relaxed(&ceval->pending_mainthread.calls_to_do, 0); + } _Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 0); COMPUTE_EVAL_BREAKER(interp, ceval, ceval2); } @@ -831,6 +835,11 @@ _PyEval_AddPendingCall(PyInterpreterState *interp, { assert(!mainthreadonly || _Py_IsMainInterpreter(interp)); struct _pending_calls *pending = &interp->ceval.pending; + if (mainthreadonly) { + /* The main thread only exists in the main interpreter. */ + assert(_Py_IsMainInterpreter(interp)); + pending = &_PyRuntime.ceval.pending_mainthread; + } /* Ensure that _PyEval_InitState() was called and that _PyEval_FiniState() is not called yet. */ assert(pending->lock != NULL); @@ -840,7 +849,7 @@ _PyEval_AddPendingCall(PyInterpreterState *interp, PyThread_release_lock(pending->lock); /* signal main loop */ - SIGNAL_PENDING_CALLS(interp); + SIGNAL_PENDING_CALLS(pending, interp); return result; } @@ -872,10 +881,15 @@ handle_signals(PyThreadState *tstate) static inline int has_pending_calls(PyInterpreterState *interp) { - if (_Py_atomic_load_relaxed_int32(&interp->ceval.pending.calls_to_do)) { + struct _pending_calls *pending = &interp->ceval.pending; + if (_Py_atomic_load_relaxed_int32(&pending->calls_to_do)) { return 1; } - return 0; + if (!_Py_IsMainThread()) { + return 0; + } + pending = &_PyRuntime.ceval.pending_mainthread; + return _Py_atomic_load_relaxed_int32(&pending->calls_to_do); } static int @@ -906,11 +920,7 @@ static int make_pending_calls(PyInterpreterState *interp) { struct _pending_calls *pending = &interp->ceval.pending; - - /* only execute pending calls on main thread */ - if (!_Py_ThreadCanHandlePendingCalls()) { - return 0; - } + struct _pending_calls *pending_main = &_PyRuntime.ceval.pending_mainthread; /* don't perform recursive pending calls */ if (pending->busy) { @@ -922,13 +932,20 @@ make_pending_calls(PyInterpreterState *interp) added in-between re-signals */ UNSIGNAL_PENDING_CALLS(interp); - int res = _make_pending_calls(pending); - if (res < 0) { + if (_make_pending_calls(pending) != 0) { pending->busy = 0; - SIGNAL_PENDING_CALLS(interp); + SIGNAL_PENDING_CALLS(pending, interp); return -1; } + if (_Py_IsMainThread()) { + if (_make_pending_calls(pending_main) != 0) { + pending->busy = 0; + SIGNAL_PENDING_CALLS(pending_main, interp); + return -1; + } + } + pending->busy = 0; return 0; } From c6cfacaca7ee0159753546464689a00c88400b6c Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 23 May 2023 13:44:08 -0600 Subject: [PATCH 12/29] Drop _Py_ThreadCanHandlePendingCalls(). --- Include/internal/pycore_pystate.h | 8 -------- 1 file changed, 8 deletions(-) diff --git a/Include/internal/pycore_pystate.h b/Include/internal/pycore_pystate.h index daa40cf4bcd855..43652c4405ec1a 100644 --- a/Include/internal/pycore_pystate.h +++ b/Include/internal/pycore_pystate.h @@ -60,14 +60,6 @@ _Py_ThreadCanHandleSignals(PyInterpreterState *interp) } -/* Only execute pending calls on the main thread. */ -static inline int -_Py_ThreadCanHandlePendingCalls(void) -{ - return _Py_IsMainThread(); -} - - /* Variable and static inline functions for in-line access to current thread and interpreter state */ From dc11024a6f37a6c5bb645f81cd05ff29e31515f3 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 25 May 2023 12:23:45 -0600 Subject: [PATCH 13/29] Expand the handle_eval_breaker comment. --- Python/ceval.c | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/Python/ceval.c b/Python/ceval.c index e81b6beedfcee1..b89c2c2a319b15 100644 --- a/Python/ceval.c +++ b/Python/ceval.c @@ -758,6 +758,60 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, _PyInterpreterFrame *frame, int * We need to do reasonably frequently, but not too frequently. * All loops should include a check of the eval breaker. * We also check on return from any builtin function. + * + * ## More Details ### + * + * The eval loop (this function) normally executes the instructions + * of a code object sequentially. However, the runtime supports a + * number of out-of-band execution scenarios that may pause that + * sequential execution long enough to do that out-of-band work + * in the current thread using the current PyThreadState. + * + * The scenarios include: + * + * - cyclic garbage collection + * - GIL drop requests + * - "async" exceptions + * - "pending calls" + * - signal handling (only in the main thread) + * + * When the need for one of the above is detected, the eval loop + * calls _Py_HandlePending() (from ceval_gil.c). Then, if that + * didn't trigger an exception, the eval loop resumes executing + * the sequential instructions. + * + * To make this work, the eval loop periodically checks if any + * of the above needs to happen. The individual checks can be + * expensive if computed each time, so a while back we switched + * to using pre-computed, per-interpreter variables for the checks, + * and later consolidated that to a single "eval breaker" variable. + * (See PyInterpreterState.ceval.eval_breaker in pycore_ceval_state.h.) + * + * For the longest time, the eval breaker check would happen + * frequently, every 5 or so times through the loop, regardless + * of what instruction ran last or what would run next. Then, in + * early 2021 (gh-18334, commit 4958f5d), we switched to checking + * the eval breaker less frequently, by hard-coding the check to + * specific places in the eval loop (e.g. certain instructions). + * The intent then was to check after returning from calls + * and on the back edges of loops. + * + * In addition to being more efficient, that approach keeps + * the eval loop from running arbitrary code between instructions + * that don't handle that well. (See gh-74174.) + * + * Currently, the eval breaker check happens here at the + * "handle_eval_breaker" label. Some instructions come here + * explicitly (goto) and some indirectly via the CHECK_EVAL_BREAKER + * macro (see ceval_macros.h). Notably, the check happens at the + * end of the JUMP_BACKWARD instruction, which pretty much applies + * to all loops. The same applies to the CALL instruction and + * many (but not all) of the CALL_* instructions. + * See bytecodes.c for exact information. + * + * One consequence of this approach is that it can be tricky + * to force any specific thread to pick up the eval breaker, + * or for any specific thread to not pick it up. */ if (_Py_HandlePending(tstate) != 0) { goto error; From 6c3d06cf137c554f8cf0c5a0b6bd86f2f7283f12 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 30 May 2023 18:35:09 -0600 Subject: [PATCH 14/29] Do not require faulthandler for test.support.threading_helper.start_threads(). --- Lib/test/support/threading_helper.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/Lib/test/support/threading_helper.py b/Lib/test/support/threading_helper.py index b9973c8bf5c914..7f16050f32b9d1 100644 --- a/Lib/test/support/threading_helper.py +++ b/Lib/test/support/threading_helper.py @@ -115,7 +115,11 @@ def join_thread(thread, timeout=None): @contextlib.contextmanager def start_threads(threads, unlock=None): - import faulthandler + try: + import faulthandler + except ImportError: + # It isn't supported on subinterpreters yet. + faulthandler = None threads = list(threads) started = [] try: @@ -147,7 +151,8 @@ def start_threads(threads, unlock=None): finally: started = [t for t in started if t.is_alive()] if started: - faulthandler.dump_traceback(sys.stdout) + if faulthandler is not None: + faulthandler.dump_traceback(sys.stdout) raise AssertionError('Unable to join %d threads' % len(started)) From 69ff9e632d6385ac7552fcd5f8fec613467263b5 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 25 May 2023 08:32:31 -0600 Subject: [PATCH 15/29] Add tests. --- Lib/test/test_capi/test_misc.py | 247 +++++++++++++++++++++++++++++++- Modules/_testinternalcapi.c | 124 +++++++++++++++- 2 files changed, 369 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_capi/test_misc.py b/Lib/test/test_capi/test_misc.py index 3a4937c3bf5faf..fef83ec6894591 100644 --- a/Lib/test/test_capi/test_misc.py +++ b/Lib/test/test_capi/test_misc.py @@ -2,22 +2,26 @@ # these are all functions _testcapi exports whose name begins with 'test_'. import _thread -from collections import OrderedDict +from collections import OrderedDict, deque import contextlib import importlib.machinery import importlib.util +import json import os import pickle +import queue import random import subprocess import sys import textwrap import threading import time +import types import unittest import warnings import weakref import operator +import _xxsubinterpreters as _interpreters from test import support from test.support import MISSING_C_DOCSTRINGS from test.support import import_helper @@ -1231,6 +1235,10 @@ def test_pyobject_getitemdata_error(self): class TestPendingCalls(unittest.TestCase): + # See the comment in ceval.c (at the "handle_eval_breaker" label) + # about when pending calls get run. This is especially relevant + # here for creating deterministic tests. + def pendingcalls_submit(self, l, n): def callback(): #this function can be interrupted by thread switching so let's @@ -1313,6 +1321,243 @@ def genf(): yield gen = genf() self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code) + class PendingTask(types.SimpleNamespace): + + _add_pending = _testinternalcapi.pending_threadfunc + + def __init__(self, req, taskid=None, notify_done=None): + self.id = taskid + self.req = req + self.notify_done = notify_done + + self.creator_tid = threading.get_ident() + self.requester_tid = None + self.runner_tid = None + self.result = None + + def run(self): + assert self.result is None + self.runner_tid = threading.get_ident() + self._run() + if self.notify_done is not None: + self.notify_done() + + def _run(self): + self.result = self.req + + def run_in_pending_call(self, worker_tids): + assert self._add_pending is _testinternalcapi.pending_threadfunc + self.requester_tid = threading.get_ident() + def callback(): + assert self.result is None + # It can be tricky to control which thread handles + # the eval breaker, so we take a naive approach to + # make sure. + if threading.get_ident() not in worker_tids: + self._add_pending(callback, ensure_added=True) + return + self.run() + self._add_pending(callback, ensure_added=True) + + def create_thread(self, worker_tids): + return threading.Thread( + target=self.run_in_pending_call, + args=(worker_tids,), + ) + + def wait_for_result(self): + while self.result is None: + time.sleep(0.01) + + def test_subthreads_can_handle_pending_calls(self): + payload = 'Spam spam spam spam. Lovely spam! Wonderful spam!' + + task = self.PendingTask(payload) + def do_the_work(): + tid = threading.get_ident() + t = task.create_thread({tid}) + with threading_helper.start_threads([t]): + task.wait_for_result() + t = threading.Thread(target=do_the_work) + with threading_helper.start_threads([t]): + pass + + self.assertEqual(task.result, payload) + + def test_many_subthreads_can_handle_pending_calls(self): + main_tid = threading.get_ident() + self.assertEqual(threading.main_thread().ident, main_tid) + + # We can't use queue.Queue since it isn't reentrant relative + # to pending calls. + _queue = deque() + _active = deque() + _done_lock = threading.Lock() + def queue_put(task): + _queue.append(task) + _active.append(True) + def queue_get(): + try: + task = _queue.popleft() + except IndexError: + raise queue.Empty + return task + def queue_task_done(): + _active.pop() + if not _active: + try: + _done_lock.release() + except RuntimeError: + assert not _done_lock.locked() + def queue_empty(): + return not _queue + def queue_join(): + _done_lock.acquire() + _done_lock.release() + + tasks = [] + for i in range(20): + task = self.PendingTask( + req=f'request {i}', + taskid=i, + notify_done=queue_task_done, + ) + tasks.append(task) + queue_put(task) + # This will be released once all the tasks have finished. + _done_lock.acquire() + + def add_tasks(worker_tids): + while True: + if done: + return + try: + task = queue_get() + except queue.Empty: + break + task.run_in_pending_call(worker_tids) + + done = False + def run_tasks(): + while not queue_empty(): + if done: + return + time.sleep(0.01) + # Give the worker a chance to handle any remaining pending calls. + while not done: + time.sleep(0.01) + + # Start the workers and wait for them to finish. + worker_threads = [threading.Thread(target=run_tasks) + for _ in range(3)] + with threading_helper.start_threads(worker_threads): + try: + # Add a pending call for each task. + worker_tids = [t.ident for t in worker_threads] + threads = [threading.Thread(target=add_tasks, args=(worker_tids,)) + for _ in range(3)] + with threading_helper.start_threads(threads): + try: + pass + except BaseException: + done = True + raise # re-raise + # Wait for the pending calls to finish. + queue_join() + # Notify the workers that they can stop. + done = True + except BaseException: + done = True + raise # re-raise + runner_tids = [t.runner_tid for t in tasks] + + self.assertNotIn(main_tid, runner_tids) + for task in tasks: + with self.subTest(f'task {task.id}'): + self.assertNotEqual(task.requester_tid, main_tid) + self.assertNotEqual(task.requester_tid, task.runner_tid) + self.assertNotIn(task.requester_tid, runner_tids) + + def test_isolated_subinterpreter(self): + # We exercise the most important permutations. + + interpid = _interpreters.create() + _interpreters.run_string(interpid, f"""if True: + import os + import threading + import time + import _testinternalcapi + from test.support import threading_helper + """) + + def create_pipe(): + r, w = os.pipe() + self.addCleanup(lambda: os.close(r)) + self.addCleanup(lambda: os.close(w)) + return r, w + + with self.subTest('add in main, run in subinterpreter'): + r_from_main, w_to_sub = create_pipe() + r_from_sub, w_to_main = create_pipe() + + def do_work(): + _interpreters.run_string(interpid, f"""if True: + # Wait until we handle the pending call. + while not os.read({r_from_main}, 1): + time.sleep(0.01) + """) + t = threading.Thread(target=do_work) + with threading_helper.start_threads([t]): + # Add the pending call. + _testinternalcapi.pending_fd_identify(interpid, w_to_main) + # Wait for it to be done. + text = None + while not text: + text = os.read(r_from_sub, 250) + # Signal the subinterpreter to stop. + os.write(w_to_sub, b'spam') + data = json.loads(text) + + self.assertEqual(data['interpid'], int(interpid)) + + with self.subTest('add in main, run in subinterpreter sub-thread'): + r_from_main, w_to_sub = create_pipe() + r_from_sub, w_to_main = create_pipe() + + def do_work(): + _interpreters.run_string(interpid, f"""if True: + def subthread(): + import importlib.util + # Wait until we handle the pending call. + while not os.read({r_from_main}, 1): + time.sleep(0.01) + t = threading.Thread(target=subthread) + with threading_helper.start_threads([t]): + pass + """) + t = threading.Thread(target=do_work) + with threading_helper.start_threads([t]): + # Add the pending call. + _testinternalcapi.pending_fd_identify(interpid, w_to_main) + # Wait for it to be done. + text = None + while not text: + text = os.read(r_from_sub, 250) + # Signal the subinterpreter to stop. + os.write(w_to_sub, b'spam') + data = json.loads(text) + + self.assertEqual(data['interpid'], int(interpid)) + + with self.subTest('add in subinterpreter, run in subinterpreter sub-thread'): + pass + + with self.subTest('add in subinterpreter, run in main'): + pass + + with self.subTest('add in subinterpreter, run in sub-thread'): + pass + class SubinterpreterTest(unittest.TestCase): diff --git a/Modules/_testinternalcapi.c b/Modules/_testinternalcapi.c index b43dc7fbf3236c..bff1c172efcafe 100644 --- a/Modules/_testinternalcapi.c +++ b/Modules/_testinternalcapi.c @@ -16,13 +16,15 @@ #include "pycore_atomic_funcs.h" // _Py_atomic_int_get() #include "pycore_bitutils.h" // _Py_bswap32() #include "pycore_compile.h" // _PyCompile_CodeGen, _PyCompile_OptimizeCfg, _PyCompile_Assemble +#include "pycore_ceval.h" // _PyEval_AddPendingCall #include "pycore_fileutils.h" // _Py_normpath #include "pycore_frame.h" // _PyInterpreterFrame #include "pycore_gc.h" // PyGC_Head #include "pycore_hashtable.h" // _Py_hashtable_new() #include "pycore_initconfig.h" // _Py_GetConfigsAsDict() -#include "pycore_pathconfig.h" // _PyPathConfig_ClearGlobal() #include "pycore_interp.h" // _PyInterpreterState_GetConfigCopy() +#include "pycore_interpreteridobject.h" // _PyInterpreterID_LookUp() +#include "pycore_pathconfig.h" // _PyPathConfig_ClearGlobal() #include "pycore_pyerrors.h" // _Py_UTF8_Edit_Cost() #include "pycore_pystate.h" // _PyThreadState_GET() #include "osdefs.h" // MAXPATHLEN @@ -838,6 +840,123 @@ set_optimizer(PyObject *self, PyObject *opt) Py_RETURN_NONE; } + +static int _pending_callback(void *arg) +{ + /* we assume the argument is callable object to which we own a reference */ + PyObject *callable = (PyObject *)arg; + PyObject *r = PyObject_CallNoArgs(callable); + Py_DECREF(callable); + Py_XDECREF(r); + return r != NULL ? 0 : -1; +} + +/* The following requests n callbacks to _pending_callback. It can be + * run from any python thread. + */ +static PyObject * +pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs) +{ + PyObject *callable; + int ensure_added = 0; + static char *kwlist[] = {"", "ensure_added", NULL}; + if (!PyArg_ParseTupleAndKeywords(args, kwargs, + "O|$p:pending_threadfunc", kwlist, + &callable, &ensure_added)) + { + return NULL; + } + PyInterpreterState *interp = PyInterpreterState_Get(); + + /* create the reference for the callbackwhile we hold the lock */ + Py_INCREF(callable); + + int r; + Py_BEGIN_ALLOW_THREADS + r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0); + Py_END_ALLOW_THREADS + if (r < 0) { + /* unsuccessful add */ + if (!ensure_added) { + Py_DECREF(callable); + Py_RETURN_FALSE; + } + do { + Py_BEGIN_ALLOW_THREADS + r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0); + Py_END_ALLOW_THREADS + } while (r < 0); + } + + Py_RETURN_TRUE; +} + + +struct _pending_fd_data { + int fileno; + const char *text; +}; + +static int _pending_fd_callback(void *arg) +{ + union { + int value; + void *voidptr; + } data; + data.voidptr = arg; + int fileno = data.value; + + /* Generate the text payload. */ + PyThreadState *tstate = PyThreadState_Get(); + int64_t interpid = PyInterpreterState_GetID(tstate->interp); + char buffer[256]; + snprintf(buffer, 256, "{\"interpid\": %ld, \"threadid\": %ld}", + interpid, tstate->thread_id); + + /* Call os.write(fileno, text). */ + PyObject *os = PyImport_ImportModule("os"); + assert(os != NULL); + PyObject *result = PyObject_CallMethod(os, "write", "iy", fileno, buffer); + if (result == NULL) { + return -1; + } + Py_DECREF(result); + return 0; +} + +static PyObject * +pending_fd_identify(PyObject *self, PyObject *args) +{ + PyObject *interpid; + union { + int value; + void *voidptr; + } fileno; + if (!PyArg_ParseTuple(args, "Oi:pending_fd_identify", + &interpid, &fileno.value)) + { + return NULL; + } + PyInterpreterState *interp = _PyInterpreterID_LookUp(interpid); + if (interp == NULL) { + if (!PyErr_Occurred()) { + PyErr_SetString(PyExc_ValueError, "interpreter not found"); + } + return NULL; + } + + int r; + do { + Py_BEGIN_ALLOW_THREADS + r = _PyEval_AddPendingCall(interp, &_pending_fd_callback, + fileno.voidptr, 0); + Py_END_ALLOW_THREADS + } while (r < 0); + + Py_RETURN_NONE; +} + + static PyMethodDef module_functions[] = { {"get_configs", get_configs, METH_NOARGS}, {"get_recursion_depth", get_recursion_depth, METH_NOARGS}, @@ -868,6 +987,9 @@ static PyMethodDef module_functions[] = { {"iframe_getlasti", iframe_getlasti, METH_O, NULL}, {"set_optimizer", set_optimizer, METH_O, NULL}, {"get_counter_optimizer", get_counter_optimizer, METH_NOARGS, NULL}, + {"pending_threadfunc", _PyCFunction_CAST(pending_threadfunc), + METH_VARARGS | METH_KEYWORDS}, + {"pending_fd_identify", pending_fd_identify, METH_VARARGS, NULL}, {NULL, NULL} /* sentinel */ }; From fdde46d13de22367b87ac29509f0bdeb78ae9678 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 2 Jun 2023 15:15:51 -0600 Subject: [PATCH 16/29] Add a NEWS entry. --- .../2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst diff --git a/Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst b/Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst new file mode 100644 index 00000000000000..88f5ff926ad350 --- /dev/null +++ b/Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst @@ -0,0 +1,8 @@ +The "pending call" machinery now works for all interpreters, not just the +main interpreter, and runs in all threads, not just the main thread. Some +calls are still only done in the main thread, ergo in the main interpreter. +This change does not affect the existing public C-API +(``Py_AddPendingCall()``) which still only targets the main thread. The new +functionality is meant strictly for internal use. This change brings the +capability in line with the intention when the state was made +per-interpreter several years ago. From c1fb6474a4fe72ce11130e0eac7c11aa7ef61eae Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 5 Jun 2023 15:42:22 -0600 Subject: [PATCH 17/29] Skip the test if subinterpreters not supported. --- Lib/test/test_capi/test_misc.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_capi/test_misc.py b/Lib/test/test_capi/test_misc.py index fef83ec6894591..c15800318a38f7 100644 --- a/Lib/test/test_capi/test_misc.py +++ b/Lib/test/test_capi/test_misc.py @@ -21,7 +21,6 @@ import warnings import weakref import operator -import _xxsubinterpreters as _interpreters from test import support from test.support import MISSING_C_DOCSTRINGS from test.support import import_helper @@ -41,6 +40,10 @@ import _testsinglephase except ImportError: _testsinglephase = None +try: + import _xxsubinterpreters as _interpreters +except ModuleNotFoundError: + _interpreters = None # Skip this test if the _testcapi module isn't available. _testcapi = import_helper.import_module('_testcapi') @@ -52,6 +55,12 @@ def decode_stderr(err): return err.decode('utf-8', 'replace').replace('\r', '') +def requires_subinterpreters(meth): + """Decorator to skip a test if subinterpreters are not supported.""" + return unittest.skipIf(_interpreters is None, + 'subinterpreters required')(meth) + + def testfunction(self): """some doc""" return self @@ -1478,6 +1487,7 @@ def run_tasks(): self.assertNotEqual(task.requester_tid, task.runner_tid) self.assertNotIn(task.requester_tid, runner_tids) + @requires_subinterpreters def test_isolated_subinterpreter(self): # We exercise the most important permutations. From e06b6f7debf2de315a747f34c0aaaa2fa01353e6 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 6 Jun 2023 14:25:14 -0600 Subject: [PATCH 18/29] Adjust UNSIGNAL_PENDING_CALLS(). --- Python/ceval_gil.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 49038d3d177a86..f311389f7caf3a 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -110,7 +110,8 @@ UNSIGNAL_PENDING_CALLS(PyInterpreterState *interp) { struct _ceval_runtime_state *ceval = &interp->runtime->ceval; struct _ceval_state *ceval2 = &interp->ceval; - if (_Py_IsMainInterpreter(interp)) { + if (_Py_IsMainThread()) { + assert(_Py_IsMainInterpreter(interp)); _Py_atomic_store_relaxed(&ceval->pending_mainthread.calls_to_do, 0); } _Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 0); From 2fabab77c04b6ec988abaec509e8f338a869a389 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 6 Jun 2023 16:31:21 -0600 Subject: [PATCH 19/29] Be more careful in make_pending_calls(). --- Python/ceval_gil.c | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index f311389f7caf3a..99fa5b293ea11e 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -923,11 +923,25 @@ make_pending_calls(PyInterpreterState *interp) struct _pending_calls *pending = &interp->ceval.pending; struct _pending_calls *pending_main = &_PyRuntime.ceval.pending_mainthread; - /* don't perform recursive pending calls */ + /* Only one thread (per interpreter) may run the pending calls + at once. In the same way, we don't do recursive pending calls. */ + PyThread_acquire_lock(pending->lock, WAIT_LOCK); if (pending->busy) { + /* A pending call was added after another thread was already + handling the pending calls (and had already "unsignaled"). + Once that thread is done, it may have taken care of all the + pending calls, or there might be some still waiting. + Regardless, this interpreter's pending calls will stay + "signaled" until that first thread has finished. At that + point the next thread to trip the eval breaker will take + care of any remaining pending calls. Until then, though, + all the interpreter's threads will be tripping the eval + breaker every time it's checked. */ + PyThread_release_lock(pending->lock); return 0; } pending->busy = 1; + PyThread_release_lock(pending->lock); /* unsignal before starting to call callbacks, so that any callback added in-between re-signals */ @@ -935,6 +949,7 @@ make_pending_calls(PyInterpreterState *interp) if (_make_pending_calls(pending) != 0) { pending->busy = 0; + /* There might not be more calls to make, but we play it safe. */ SIGNAL_PENDING_CALLS(pending, interp); return -1; } @@ -942,6 +957,7 @@ make_pending_calls(PyInterpreterState *interp) if (_Py_IsMainThread()) { if (_make_pending_calls(pending_main) != 0) { pending->busy = 0; + /* There might not be more calls to make, but we play it safe. */ SIGNAL_PENDING_CALLS(pending_main, interp); return -1; } From 8445fc02bee2fe3f5bb82044fd5244abfb3fd418 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 6 Jun 2023 16:32:42 -0600 Subject: [PATCH 20/29] The main thread may be used by subinterpreters. --- Python/ceval_gil.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 99fa5b293ea11e..8204cd9998dc68 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -69,7 +69,7 @@ COMPUTE_EVAL_BREAKER(PyInterpreterState *interp, | (_Py_atomic_load_relaxed_int32(&ceval->signals_pending) && _Py_ThreadCanHandleSignals(interp)) | (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do)) - | (_Py_IsMainThread() + | (_Py_IsMainThread() && _Py_IsMainInterpreter(interp) &&_Py_atomic_load_relaxed_int32(&ceval->pending_mainthread.calls_to_do)) | ceval2->pending.async_exc | _Py_atomic_load_relaxed_int32(&ceval2->gc_scheduled)); @@ -110,8 +110,7 @@ UNSIGNAL_PENDING_CALLS(PyInterpreterState *interp) { struct _ceval_runtime_state *ceval = &interp->runtime->ceval; struct _ceval_state *ceval2 = &interp->ceval; - if (_Py_IsMainThread()) { - assert(_Py_IsMainInterpreter(interp)); + if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) { _Py_atomic_store_relaxed(&ceval->pending_mainthread.calls_to_do, 0); } _Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 0); @@ -886,7 +885,7 @@ has_pending_calls(PyInterpreterState *interp) if (_Py_atomic_load_relaxed_int32(&pending->calls_to_do)) { return 1; } - if (!_Py_IsMainThread()) { + if (!_Py_IsMainThread() || !_Py_IsMainInterpreter(interp)) { return 0; } pending = &_PyRuntime.ceval.pending_mainthread; @@ -954,7 +953,7 @@ make_pending_calls(PyInterpreterState *interp) return -1; } - if (_Py_IsMainThread()) { + if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) { if (_make_pending_calls(pending_main) != 0) { pending->busy = 0; /* There might not be more calls to make, but we play it safe. */ @@ -986,7 +985,7 @@ _PyEval_MakePendingCalls(PyThreadState *tstate) { int res; - if (_Py_IsMainThread()) { + if (_Py_IsMainThread() && _Py_IsMainInterpreter(tstate->interp)) { /* Python signal handler doesn't really queue a callback: it only signals that a signal was received, see _PyEval_SignalReceived(). */ @@ -1015,7 +1014,7 @@ Py_MakePendingCalls(void) assert(is_tstate_valid(tstate)); /* Only execute pending calls on the main thread. */ - if (!_Py_IsMainThread()) { + if (!_Py_IsMainThread() || !_Py_IsMainInterpreter(tstate->interp)) { return 0; } return _PyEval_MakePendingCalls(tstate); From a83a321142bed73c6d269dd4460e353c3b1fd2c9 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 6 Jun 2023 16:33:05 -0600 Subject: [PATCH 21/29] Factor out _next_pending_call(). --- Python/ceval_gil.c | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 8204cd9998dc68..86a5d5940fd106 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -807,20 +807,31 @@ _push_pending_call(struct _pending_calls *pending, return 0; } -/* Pop one item off the queue while holding the lock. */ -static void -_pop_pending_call(struct _pending_calls *pending, - int (**func)(void *), void **arg) +static int +_next_pending_call(struct _pending_calls *pending, + int (**func)(void *), void **arg) { int i = pending->first; if (i == pending->last) { - return; /* Queue empty */ + /* Queue empty */ + assert(pending->calls[i].func == NULL); + return -1; } - *func = pending->calls[i].func; *arg = pending->calls[i].arg; - pending->calls[i] = (struct _pending_call){0}; - pending->first = (i + 1) % NPENDINGCALLS; + return i; +} + +/* Pop one item off the queue while holding the lock. */ +static void +_pop_pending_call(struct _pending_calls *pending, + int (**func)(void *), void **arg) +{ + int i = _next_pending_call(pending, func, arg); + if (i >= 0) { + pending->calls[i] = (struct _pending_call){0}; + pending->first = (i + 1) % NPENDINGCALLS; + } } /* This implementation is thread-safe. It allows From aca2a8ca233249d69ffd0f6d7051cc21dacf045a Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 6 Jun 2023 16:37:21 -0600 Subject: [PATCH 22/29] has_pending_calls() -> maybe_has_pending_calls(). --- Python/ceval_gil.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 86a5d5940fd106..ccd960c38faef5 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -890,7 +890,7 @@ handle_signals(PyThreadState *tstate) } static inline int -has_pending_calls(PyInterpreterState *interp) +maybe_has_pending_calls(PyInterpreterState *interp) { struct _pending_calls *pending = &interp->ceval.pending; if (_Py_atomic_load_relaxed_int32(&pending->calls_to_do)) { @@ -1068,7 +1068,7 @@ _Py_HandlePending(PyThreadState *tstate) } /* Pending calls */ - if (has_pending_calls(tstate->interp)) { + if (maybe_has_pending_calls(tstate->interp)) { if (make_pending_calls(tstate->interp) != 0) { return -1; } From 7b8b8da1a05552e93250061a2b70c6bb6893e496 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 12 Jun 2023 12:23:04 -0600 Subject: [PATCH 23/29] Drop a dead import. --- Lib/test/test_capi/test_misc.py | 1 - 1 file changed, 1 deletion(-) diff --git a/Lib/test/test_capi/test_misc.py b/Lib/test/test_capi/test_misc.py index 070032625d6f85..a142ca2a99ed93 100644 --- a/Lib/test/test_capi/test_misc.py +++ b/Lib/test/test_capi/test_misc.py @@ -1536,7 +1536,6 @@ def do_work(): def do_work(): _interpreters.run_string(interpid, f"""if True: def subthread(): - import importlib.util # Wait until we handle the pending call. while not os.read({r_from_main}, 1): time.sleep(0.01) From 90b3a1f299a51ac56bd64aa997cdf687cd921d3b Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 12 Jun 2023 12:23:44 -0600 Subject: [PATCH 24/29] Clarify some comments. --- Lib/test/test_capi/test_misc.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_capi/test_misc.py b/Lib/test/test_capi/test_misc.py index a142ca2a99ed93..58f6eecc1c6490 100644 --- a/Lib/test/test_capi/test_misc.py +++ b/Lib/test/test_capi/test_misc.py @@ -1490,6 +1490,10 @@ def run_tasks(): def test_isolated_subinterpreter(self): # We exercise the most important permutations. + # This test relies on pending calls getting called + # (eval breaker tripped) at each loop iteration + # and at each call. + interpid = _interpreters.create() _interpreters.run_string(interpid, f"""if True: import os @@ -1511,7 +1515,7 @@ def create_pipe(): def do_work(): _interpreters.run_string(interpid, f"""if True: - # Wait until we handle the pending call. + # Wait until this interp has handled the pending call. while not os.read({r_from_main}, 1): time.sleep(0.01) """) @@ -1536,7 +1540,7 @@ def do_work(): def do_work(): _interpreters.run_string(interpid, f"""if True: def subthread(): - # Wait until we handle the pending call. + # Wait until this interp has handled the pending call. while not os.read({r_from_main}, 1): time.sleep(0.01) t = threading.Thread(target=subthread) From 4f0068db8fda1efb866b04de68e477fc684f9749 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 12 Jun 2023 12:43:01 -0600 Subject: [PATCH 25/29] Add timeouts. --- Lib/test/test_capi/test_misc.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/Lib/test/test_capi/test_misc.py b/Lib/test/test_capi/test_misc.py index 58f6eecc1c6490..b7d769c81ffd57 100644 --- a/Lib/test/test_capi/test_misc.py +++ b/Lib/test/test_capi/test_misc.py @@ -1512,12 +1512,15 @@ def create_pipe(): with self.subTest('add in main, run in subinterpreter'): r_from_main, w_to_sub = create_pipe() r_from_sub, w_to_main = create_pipe() + timeout = time.time() + 30 # seconds def do_work(): _interpreters.run_string(interpid, f"""if True: # Wait until this interp has handled the pending call. while not os.read({r_from_main}, 1): time.sleep(0.01) + if time.time() > {timeout}: + raise Exception('timed out!') """) t = threading.Thread(target=do_work) with threading_helper.start_threads([t]): @@ -1526,6 +1529,8 @@ def do_work(): # Wait for it to be done. text = None while not text: + if time.time() > timeout: + raise Exception('timed out!') text = os.read(r_from_sub, 250) # Signal the subinterpreter to stop. os.write(w_to_sub, b'spam') @@ -1536,6 +1541,7 @@ def do_work(): with self.subTest('add in main, run in subinterpreter sub-thread'): r_from_main, w_to_sub = create_pipe() r_from_sub, w_to_main = create_pipe() + timeout = time.time() + 30 # seconds def do_work(): _interpreters.run_string(interpid, f"""if True: @@ -1543,6 +1549,8 @@ def subthread(): # Wait until this interp has handled the pending call. while not os.read({r_from_main}, 1): time.sleep(0.01) + if time.time() > {timeout}: + raise Exception('timed out!') t = threading.Thread(target=subthread) with threading_helper.start_threads([t]): pass @@ -1554,6 +1562,8 @@ def subthread(): # Wait for it to be done. text = None while not text: + if time.time() > timeout: + raise Exception('timed out!') text = os.read(r_from_sub, 250) # Signal the subinterpreter to stop. os.write(w_to_sub, b'spam') From d5d7b42183635a2cd49b1a3c7f84efed7827425a Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 12 Jun 2023 17:48:04 -0600 Subject: [PATCH 26/29] Implement the remaining tests. --- Lib/test/test_capi/test_misc.py | 213 ++++++++++++++++++++++++++------ Modules/_testinternalcapi.c | 74 ++++++----- 2 files changed, 212 insertions(+), 75 deletions(-) diff --git a/Lib/test/test_capi/test_misc.py b/Lib/test/test_capi/test_misc.py index b7d769c81ffd57..8c717d72a994d0 100644 --- a/Lib/test/test_capi/test_misc.py +++ b/Lib/test/test_capi/test_misc.py @@ -1494,8 +1494,11 @@ def test_isolated_subinterpreter(self): # (eval breaker tripped) at each loop iteration # and at each call. + maxtext = 250 + main_interpid = 0 interpid = _interpreters.create() _interpreters.run_string(interpid, f"""if True: + import json import os import threading import time @@ -1510,75 +1513,211 @@ def create_pipe(): return r, w with self.subTest('add in main, run in subinterpreter'): - r_from_main, w_to_sub = create_pipe() - r_from_sub, w_to_main = create_pipe() + r_ready, w_ready = create_pipe() + r_done, w_done= create_pipe() timeout = time.time() + 30 # seconds def do_work(): _interpreters.run_string(interpid, f"""if True: # Wait until this interp has handled the pending call. - while not os.read({r_from_main}, 1): - time.sleep(0.01) - if time.time() > {timeout}: - raise Exception('timed out!') + waiting = False + done = False + def wait(os_read=os.read): + global done, waiting + waiting = True + os_read({r_done}, 1) + done = True + t = threading.Thread(target=wait) + with threading_helper.start_threads([t]): + while not waiting: + pass + os.write({w_ready}, b'\\0') + # Loop to trigger the eval breaker. + while not done: + time.sleep(0.01) + if time.time() > {timeout}: + raise Exception('timed out!') """) t = threading.Thread(target=do_work) with threading_helper.start_threads([t]): - # Add the pending call. - _testinternalcapi.pending_fd_identify(interpid, w_to_main) - # Wait for it to be done. - text = None - while not text: - if time.time() > timeout: - raise Exception('timed out!') - text = os.read(r_from_sub, 250) + os.read(r_ready, 1) + # Add the pending call and wait for it to finish. + actual = _testinternalcapi.pending_identify(interpid) # Signal the subinterpreter to stop. - os.write(w_to_sub, b'spam') - data = json.loads(text) + os.write(w_done, b'\0') - self.assertEqual(data['interpid'], int(interpid)) + self.assertEqual(actual, int(interpid)) with self.subTest('add in main, run in subinterpreter sub-thread'): - r_from_main, w_to_sub = create_pipe() - r_from_sub, w_to_main = create_pipe() + r_ready, w_ready = create_pipe() + r_done, w_done= create_pipe() timeout = time.time() + 30 # seconds def do_work(): _interpreters.run_string(interpid, f"""if True: + waiting = False + done = False def subthread(): - # Wait until this interp has handled the pending call. - while not os.read({r_from_main}, 1): + while not waiting: + pass + os.write({w_ready}, b'\\0') + # Loop to trigger the eval breaker. + while not done: time.sleep(0.01) if time.time() > {timeout}: raise Exception('timed out!') t = threading.Thread(target=subthread) with threading_helper.start_threads([t]): - pass + # Wait until this interp has handled the pending call. + waiting = True + os.read({r_done}, 1) + done = True """) t = threading.Thread(target=do_work) with threading_helper.start_threads([t]): - # Add the pending call. - _testinternalcapi.pending_fd_identify(interpid, w_to_main) - # Wait for it to be done. - text = None - while not text: + os.read(r_ready, 1) + # Add the pending call and wait for it to finish. + actual = _testinternalcapi.pending_identify(interpid) + # Signal the subinterpreter to stop. + os.write(w_done, b'\0') + + self.assertEqual(actual, int(interpid)) + + with self.subTest('add in subinterpreter, run in main'): + r_ready, w_ready = create_pipe() + r_done, w_done= create_pipe() + r_data, w_data= create_pipe() + timeout = time.time() + 30 # seconds + + def add_job(): + os.read(r_ready, 1) + _interpreters.run_string(interpid, f"""if True: + # Add the pending call and wait for it to finish. + actual = _testinternalcapi.pending_identify({main_interpid}) + # Signal the subinterpreter to stop. + os.write({w_done}, b'\\0') + os.write({w_data}, actual.to_bytes(1, 'little')) + """) + # Wait until this interp has handled the pending call. + waiting = False + done = False + def wait(os_read=os.read): + nonlocal done, waiting + waiting = True + os_read(r_done, 1) + done = True + t1 = threading.Thread(target=add_job) + t2 = threading.Thread(target=wait) + with threading_helper.start_threads([t1, t2]): + while not waiting: + pass + os.write(w_ready, b'\0') + # Loop to trigger the eval breaker. + while not done: + time.sleep(0.01) if time.time() > timeout: raise Exception('timed out!') - text = os.read(r_from_sub, 250) - # Signal the subinterpreter to stop. - os.write(w_to_sub, b'spam') - data = json.loads(text) + text = os.read(r_data, 1) + actual = int.from_bytes(text, 'little') + + self.assertEqual(actual, int(main_interpid)) - self.assertEqual(data['interpid'], int(interpid)) + with self.subTest('add in subinterpreter, run in sub-thread'): + r_ready, w_ready = create_pipe() + r_done, w_done= create_pipe() + r_data, w_data= create_pipe() + timeout = time.time() + 30 # seconds + + def add_job(): + os.read(r_ready, 1) + _interpreters.run_string(interpid, f"""if True: + # Add the pending call and wait for it to finish. + actual = _testinternalcapi.pending_identify({main_interpid}) + # Signal the subinterpreter to stop. + os.write({w_done}, b'\\0') + os.write({w_data}, actual.to_bytes(1, 'little')) + """) + # Wait until this interp has handled the pending call. + waiting = False + done = False + def wait(os_read=os.read): + nonlocal done, waiting + waiting = True + os_read(r_done, 1) + done = True + def subthread(): + while not waiting: + pass + os.write(w_ready, b'\0') + # Loop to trigger the eval breaker. + while not done: + time.sleep(0.01) + if time.time() > timeout: + raise Exception('timed out!') + t1 = threading.Thread(target=add_job) + t2 = threading.Thread(target=wait) + t3 = threading.Thread(target=subthread) + with threading_helper.start_threads([t1, t2, t3]): + pass + text = os.read(r_data, 1) + actual = int.from_bytes(text, 'little') + + self.assertEqual(actual, int(main_interpid)) + + # XXX We can't use the rest until gh-105716 is fixed. + return with self.subTest('add in subinterpreter, run in subinterpreter sub-thread'): - pass + r_ready, w_ready = create_pipe() + r_done, w_done= create_pipe() + r_data, w_data= create_pipe() + timeout = time.time() + 30 # seconds - with self.subTest('add in subinterpreter, run in main'): - pass + def do_work(): + _interpreters.run_string(interpid, f"""if True: + waiting = False + done = False + def subthread(): + while not waiting: + pass + print('ready 2', flush=True) + os.write({w_ready}, b'\\0') + # Loop to trigger the eval breaker. + while not done: + time.sleep(0.01) + if time.time() > {timeout}: + raise Exception('timed out!') + print('done 2', flush=True) + t = threading.Thread(target=subthread) + with threading_helper.start_threads([t]): + # Wait until this interp has handled the pending call. + print('ready 1', flush=True) + waiting = True + os.read({r_done}, 1) + done = True + print('done 1', flush=True) + """) + t = threading.Thread(target=do_work) + #with threading_helper.start_threads([t]): + t.start() + if True: + os.read(r_ready, 1) + print('ready 3', flush=True) + _interpreters.run_string(interpid, f"""if True: + print('ready 4', flush=True) + # Add the pending call and wait for it to finish. + actual = _testinternalcapi.pending_identify({interpid}) + # Signal the subinterpreter to stop. + print('done 3', flush=True) + os.write({w_done}, b'\\0') + os.write({w_data}, actual.to_bytes(1, 'little')) + """) + t.join() + print('done 4', flush=True) + text = os.read(r_data, 1) + actual = int.from_bytes(text, 'little') - with self.subTest('add in subinterpreter, run in sub-thread'): - pass + self.assertEqual(actual, int(interpid)) class SubinterpreterTest(unittest.TestCase): diff --git a/Modules/_testinternalcapi.c b/Modules/_testinternalcapi.c index bff96b25e88074..3de32a32750ebc 100644 --- a/Modules/_testinternalcapi.c +++ b/Modules/_testinternalcapi.c @@ -892,49 +892,26 @@ pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs) } -struct _pending_fd_data { - int fileno; - const char *text; -}; +static struct { + int64_t interpid; +} pending_identify_result; -static int _pending_fd_callback(void *arg) +static int +_pending_identify_callback(void *arg) { - union { - int value; - void *voidptr; - } data; - data.voidptr = arg; - int fileno = data.value; - - /* Generate the text payload. */ + PyThread_type_lock mutex = (PyThread_type_lock)arg; + assert(pending_identify_result.interpid == -1); PyThreadState *tstate = PyThreadState_Get(); - int64_t interpid = PyInterpreterState_GetID(tstate->interp); - char buffer[256]; - snprintf(buffer, 256, "{\"interpid\": %ld, \"threadid\": %ld}", - interpid, tstate->thread_id); - - /* Call os.write(fileno, text). */ - PyObject *os = PyImport_ImportModule("os"); - assert(os != NULL); - PyObject *result = PyObject_CallMethod(os, "write", "iy", fileno, buffer); - if (result == NULL) { - return -1; - } - Py_DECREF(result); + pending_identify_result.interpid = PyInterpreterState_GetID(tstate->interp); + PyThread_release_lock(mutex); return 0; } static PyObject * -pending_fd_identify(PyObject *self, PyObject *args) +pending_identify(PyObject *self, PyObject *args) { PyObject *interpid; - union { - int value; - void *voidptr; - } fileno; - if (!PyArg_ParseTuple(args, "Oi:pending_fd_identify", - &interpid, &fileno.value)) - { + if (!PyArg_ParseTuple(args, "O:pending_identify", &interpid)) { return NULL; } PyInterpreterState *interp = _PyInterpreterID_LookUp(interpid); @@ -945,15 +922,35 @@ pending_fd_identify(PyObject *self, PyObject *args) return NULL; } + pending_identify_result.interpid = -1; + + PyThread_type_lock mutex = PyThread_allocate_lock(); + if (mutex == NULL) { + return NULL; + } + PyThread_acquire_lock(mutex, WAIT_LOCK); + /* It gets released in _pending_identify_callback(). */ + int r; do { Py_BEGIN_ALLOW_THREADS - r = _PyEval_AddPendingCall(interp, &_pending_fd_callback, - fileno.voidptr, 0); + r = _PyEval_AddPendingCall(interp, + &_pending_identify_callback, (void *)mutex, + 0); Py_END_ALLOW_THREADS } while (r < 0); - Py_RETURN_NONE; + /* Wait for the pending call to complete. */ + PyThread_acquire_lock(mutex, WAIT_LOCK); + PyThread_release_lock(mutex); + PyThread_free_lock(mutex); + + PyObject *res = PyLong_FromLongLong(pending_identify_result.interpid); + pending_identify_result.interpid = -1; + if (res == NULL) { + return NULL; + } + return res; } @@ -989,7 +986,8 @@ static PyMethodDef module_functions[] = { {"get_counter_optimizer", get_counter_optimizer, METH_NOARGS, NULL}, {"pending_threadfunc", _PyCFunction_CAST(pending_threadfunc), METH_VARARGS | METH_KEYWORDS}, - {"pending_fd_identify", pending_fd_identify, METH_VARARGS, NULL}, +// {"pending_fd_identify", pending_fd_identify, METH_VARARGS, NULL}, + {"pending_identify", pending_identify, METH_VARARGS, NULL}, {NULL, NULL} /* sentinel */ }; From 37d41cc4cdf571cf411e6b49ac04c1079da6e124 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 12 Jun 2023 17:49:27 -0600 Subject: [PATCH 27/29] Drop prints. --- Lib/test/test_capi/test_misc.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/Lib/test/test_capi/test_misc.py b/Lib/test/test_capi/test_misc.py index 8c717d72a994d0..ef98f02443914c 100644 --- a/Lib/test/test_capi/test_misc.py +++ b/Lib/test/test_capi/test_misc.py @@ -1680,40 +1680,32 @@ def do_work(): def subthread(): while not waiting: pass - print('ready 2', flush=True) os.write({w_ready}, b'\\0') # Loop to trigger the eval breaker. while not done: time.sleep(0.01) if time.time() > {timeout}: raise Exception('timed out!') - print('done 2', flush=True) t = threading.Thread(target=subthread) with threading_helper.start_threads([t]): # Wait until this interp has handled the pending call. - print('ready 1', flush=True) waiting = True os.read({r_done}, 1) done = True - print('done 1', flush=True) """) t = threading.Thread(target=do_work) #with threading_helper.start_threads([t]): t.start() if True: os.read(r_ready, 1) - print('ready 3', flush=True) _interpreters.run_string(interpid, f"""if True: - print('ready 4', flush=True) # Add the pending call and wait for it to finish. actual = _testinternalcapi.pending_identify({interpid}) # Signal the subinterpreter to stop. - print('done 3', flush=True) os.write({w_done}, b'\\0') os.write({w_data}, actual.to_bytes(1, 'little')) """) t.join() - print('done 4', flush=True) text = os.read(r_data, 1) actual = int.from_bytes(text, 'little') From fc25a8580db5ce5b26f90628b9aa95bc891dfe43 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 12 Jun 2023 18:28:08 -0600 Subject: [PATCH 28/29] Ignore the global variable. --- Tools/c-analyzer/cpython/ignored.tsv | 1 + 1 file changed, 1 insertion(+) diff --git a/Tools/c-analyzer/cpython/ignored.tsv b/Tools/c-analyzer/cpython/ignored.tsv index 607976f5afdc68..87d9b39c16113b 100644 --- a/Tools/c-analyzer/cpython/ignored.tsv +++ b/Tools/c-analyzer/cpython/ignored.tsv @@ -517,6 +517,7 @@ Modules/_testcapimodule.c - g_type_watchers_installed - Modules/_testimportmultiple.c - _barmodule - Modules/_testimportmultiple.c - _foomodule - Modules/_testimportmultiple.c - _testimportmultiple - +Modules/_testinternalcapi.c - pending_identify_result - Modules/_testmultiphase.c - Example_Type_slots - Modules/_testmultiphase.c - Example_Type_spec - Modules/_testmultiphase.c - Example_methods - From 177f1618d07a494fcf585d7868629a111dd62de3 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 13 Jun 2023 10:30:18 -0600 Subject: [PATCH 29/29] Clarify comments. --- ...-06-02-15-15-41.gh-issue-104812.dfZiG5.rst | 11 +++---- Python/ceval.c | 29 ++++++++++--------- Python/ceval_gil.c | 3 +- 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst b/Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst index 88f5ff926ad350..da29a8cae61839 100644 --- a/Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst +++ b/Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst @@ -1,8 +1,9 @@ The "pending call" machinery now works for all interpreters, not just the main interpreter, and runs in all threads, not just the main thread. Some calls are still only done in the main thread, ergo in the main interpreter. -This change does not affect the existing public C-API -(``Py_AddPendingCall()``) which still only targets the main thread. The new -functionality is meant strictly for internal use. This change brings the -capability in line with the intention when the state was made -per-interpreter several years ago. +This change does not affect signal handling nor the existing public C-API +(``Py_AddPendingCall()``), which both still only target the main thread. +The new functionality is meant strictly for internal use for now, since +consequences of its use are not well understood yet outside some very +restricted cases. This change brings the capability in line with the +intention when the state was made per-interpreter several years ago. diff --git a/Python/ceval.c b/Python/ceval.c index b89c2c2a319b15..b91f94d2873963 100644 --- a/Python/ceval.c +++ b/Python/ceval.c @@ -772,20 +772,20 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, _PyInterpreterFrame *frame, int * - cyclic garbage collection * - GIL drop requests * - "async" exceptions - * - "pending calls" + * - "pending calls" (some only in the main thread) * - signal handling (only in the main thread) * * When the need for one of the above is detected, the eval loop - * calls _Py_HandlePending() (from ceval_gil.c). Then, if that - * didn't trigger an exception, the eval loop resumes executing + * pauses long enough to handle the detected case. Then, if doing + * so didn't trigger an exception, the eval loop resumes executing * the sequential instructions. * * To make this work, the eval loop periodically checks if any * of the above needs to happen. The individual checks can be * expensive if computed each time, so a while back we switched * to using pre-computed, per-interpreter variables for the checks, - * and later consolidated that to a single "eval breaker" variable. - * (See PyInterpreterState.ceval.eval_breaker in pycore_ceval_state.h.) + * and later consolidated that to a single "eval breaker" variable + * (now a PyInterpreterState field). * * For the longest time, the eval breaker check would happen * frequently, every 5 or so times through the loop, regardless @@ -802,16 +802,17 @@ _PyEval_EvalFrameDefault(PyThreadState *tstate, _PyInterpreterFrame *frame, int * * Currently, the eval breaker check happens here at the * "handle_eval_breaker" label. Some instructions come here - * explicitly (goto) and some indirectly via the CHECK_EVAL_BREAKER - * macro (see ceval_macros.h). Notably, the check happens at the - * end of the JUMP_BACKWARD instruction, which pretty much applies - * to all loops. The same applies to the CALL instruction and - * many (but not all) of the CALL_* instructions. - * See bytecodes.c for exact information. + * explicitly (goto) and some indirectly. Notably, the check + * happens on back edges in the control flow graph, which + * pretty much applies to all loops and most calls. + * (See bytecodes.c for exact information.) * - * One consequence of this approach is that it can be tricky - * to force any specific thread to pick up the eval breaker, - * or for any specific thread to not pick it up. + * One consequence of this approach is that it might not be obvious + * how to force any specific thread to pick up the eval breaker, + * or for any specific thread to not pick it up. Mostly this + * involves judicious uses of locks and careful ordering of code, + * while avoiding code that might trigger the eval breaker + * until so desired. */ if (_Py_HandlePending(tstate) != 0) { goto error; diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index ccd960c38faef5..bb1279f46cf9f7 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -867,7 +867,8 @@ _PyEval_AddPendingCall(PyInterpreterState *interp, int Py_AddPendingCall(int (*func)(void *), void *arg) { - /* Legacy users of this API will continue to target the main thread. */ + /* Legacy users of this API will continue to target the main thread + (of the main interpreter). */ PyInterpreterState *interp = _PyInterpreterState_Main(); return _PyEval_AddPendingCall(interp, func, arg, 1); }