8000 Queue decrefs · python/cpython@bd1c1a7 · GitHub
[go: up one dir, main page]

Skip to content

Commit bd1c1a7

Browse files
committed
Queue decrefs
1 parent ad58b75 commit bd1c1a7

File tree

3 files changed

+71
-15
lines changed

3 files changed

+71
-15
lines changed

Include/internal/pycore_pymem.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,13 @@ static inline void _PyObject_XDecRefDelayed(PyObject *obj)
132132
// Periodically process delayed free requests.
133133
extern void _PyMem_ProcessDelayed(PyThreadState *tstate);
134134

135+
136+
// Periodically process delayed free requests when the world is stopped.
137+
// Notify of any objects whic should be freeed.
138+
typedef void (*delayed_dealloc_cb)(PyObject *, void *);
139+
extern void _PyMem_ProcessDelayedNoDealloc(PyThreadState *tstate,
140+
delayed_dealloc_cb cb, void *state);
141+
135142
// Abandon all thread-local delayed free requests and push them to the
136143
// interpreter's queue.
137144
extern void _PyMem_AbandonDelayed(PyThreadState *tstate);

Objects/obmalloc.c

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,10 +1093,24 @@ struct _mem_work_chunk {
10931093
};
10941094

10951095
static void
1096-
free_work_item(uintptr_t ptr)
1096+
free_work_item(uintptr_t ptr, delayed_dealloc_cb cb, void *state)
10971097
{
10981098
if (ptr & 0x01) {
1099-
Py_DECREF((PyObject*)(char *)(ptr - 1));
1099+
PyObject *obj = (PyObject*)(char *)(ptr - 1);
1100+
#ifdef Py_GIL_DISABLED
1101+
if (cb == NULL) {
1102+
assert(!_PyInterpreterState_GET()->stoptheworld.world_stopped);
1103+
Py_DECREF(obj);
1104+
return;
1105+
}
1106+
1107+
Py_ssize_t refcount = _Py_ExplicitMergeRefcount(obj, -1);
1108+
if (refcount == 0) {
1109+
cb(obj, state);
1110+
}
1111+
#else
1112+
Py_DECREF(obj);
1113+
#endif
11001114
}
11011115
else {
11021116
PyMem_Free((void *)ptr);
@@ -1107,15 +1121,16 @@ static void
11071121
free_delayed(uintptr_t ptr)
11081122
{
11091123
#ifndef Py_GIL_DISABLED
1110-
free_work_item(ptr);
1124+
free_work_item(ptr, NULL, NULL);
11111125
#else
11121126
PyInterpreterState *interp = _PyInterpreterState_GET();
11131127
if (_PyInterpreterState_GetFinalizing(interp) != NULL ||
11141128
interp->stoptheworld.world_stopped)
11151129
{
11161130
// Free immediately during interpreter shutdown or if the world is
11171131
// stopped.
1118-
free_work_item(ptr);
1132+
assert(!interp->stoptheworld.world_stopped || !(ptr & 0x01));
1133+
free_work_item(ptr, NULL, NULL);
11191134
return;
11201135
}
11211136

@@ -1142,7 +1157,8 @@ free_delayed(uintptr_t ptr)
11421157
if (buf == NULL) {
11431158
// failed to allocate a buffer, free immediately
11441159
_PyEval_StopTheWorld(tstate->base.interp);
1145-
free_work_item(ptr);
1160+
// TODO: Fix me
1161+
free_work_item(ptr, NULL, NULL);
11461162
_PyEval_StartTheWorld(tstate->base.interp);
11471163
return;
11481164
}
@@ -1185,7 +1201,7 @@ work_queue_first(struct llist_node *head)
11851201

11861202
static void
11871203
process_queue(struct llist_node *head, struct _qsbr_thread_state *qsbr,
1188-
bool keep_empty)
1204+
bool keep_empty, delayed_dealloc_cb cb, void *state)
11891205
{
11901206
while (!llist_empty(head)) {
11911207
struct _mem_work_chunk *buf = work_queue_first(head);
@@ -1196,7 +1212,7 @@ process_queue(struct llist_node *head, struct _qsbr_thread_state *qsbr,
11961212
return;
11971213
}
11981214

1199-
free_work_item(item->ptr);
1215+
free_work_item(item->ptr, cb, state);
12001216
buf->rd_idx++;
12011217
}
12021218

@@ -1214,15 +1230,16 @@ process_queue(struct llist_node *head, struct _qsbr_thread_state *qsbr,
12141230

12151231
static void
12161232
process_interp_queue(struct _Py_mem_interp_free_queue *queue,
1217-
struct _qsbr_thread_state *qsbr)
1233+
struct _qsbr_thread_state *qsbr, delayed_dealloc_cb cb,
1234+
void *state)
12181235
{
12191236
if (!_Py_atomic_load_int_relaxed(&queue->has_work)) {
12201237
return;
12211238
}
12221239

12231240
// Try to acquire the lock, but don't block if it's already held.
12241241
if (_PyMutex_LockTimed(&queue->mutex, 0, 0) == PY_LOCK_ACQUIRED) {
1225-
process_queue(&queue->head, qsbr, false);
1242+
process_queue(&queue->head, qsbr, false, cb, state);
12261243

12271244
int more_work = !llist_empty(&queue->head);
12281245
_Py_atomic_store_int_relaxed(&queue->has_work, more_work);
@@ -1238,10 +1255,23 @@ _PyMem_ProcessDelayed(PyThreadState *tstate)
12381255
_PyThreadStateImpl *tstate_impl = (_PyThreadStateImpl *)tstate;
12391256

12401257
// Process thread-local work
1241-
process_queue(&tstate_impl->mem_free_queue, tstate_impl->qsbr, true);
1258+
process_queue(&tstate_impl->mem_free_queue, tstate_impl->qsbr, true, NULL, NULL);
1259+
1260+
// Process shared interpreter work
1261+
process_interp_queue(&interp->mem_free_queue, tstate_impl->qsbr, NULL, NULL);
1262+
}
1263+
1264+
void
1265+
_PyMem_ProcessDelayedNoDealloc(PyThreadState *tstate, delayed_dealloc_cb cb, void *state)
1266+
{
1267+
PyInterpreterState *interp = tstate->interp;
1268+
_PyThreadStateImpl *tstate_impl = (_PyThreadStateImpl *)tstate;
1269+
1270+
// Process thread-local work
1271+
process_queue(&tstate_impl->mem_free_queue, tstate_impl->qsbr, true, cb, state);
12421272

12431273
// Process shared interpreter work
1244-
process_interp_queue(&interp->mem_free_queue, tstate_impl->qsbr);
1274+
process_interp_queue(&interp->mem_free_queue, tstate_impl->qsbr, cb, state);
12451275
}
12461276

12471277
void
@@ -1283,7 +1313,7 @@ _PyMem_FiniDelayed(PyInterpreterState *interp)
12831313
// Free the remaining items immediately. There should be no other
12841314
// threads accessing the memory at this point during shutdown.
12851315
struct _mem_work_item *item = &buf->array[buf->rd_idx];
1286-
free_work_item(item->ptr);
1316+
free_work_item(item->ptr, NULL, NULL);
12871317
buf->rd_idx++;
12881318
}
12891319

Python/gc_free_threading.c

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,25 @@ merge_queued_objects(_PyThreadStateImpl *tstate, struct collection_state *state)
417417
}
418418

419419
static void
420-
process_delayed_frees(PyInterpreterState *interp)
420+
queue_freed_object(PyObject *obj, void *arg)
421+
{
422+
struct collection_state *state = (struct collection_state *)arg;
423+
424+
// GC objects with zero refcount are handled subsequently by the
425+
// GC as if they were cyclic trash, but we have to handle dead
426+
// non-GC objects here. Add one to the refcount so that we can
427+
// decref and deallocate the object once we start the world again.
428+
if (!_PyObject_GC_IS_TRACKED(obj)) {
429+
obj->ob_ref_shared += (1 << _Py_REF_SHARED_SHIFT);
430+
#ifdef Py_REF_DEBUG
431+
_Py_IncRefTotal(_PyThreadState_GET());
432+
#endif
433+
worklist_push(&state->objs_to_decref, obj);
434+
}
435+
}
436+
437+
static void
438+
process_delayed_frees(PyInterpreterState *interp, struct collection_state *state)
421439
{
422440
// In STW status, we can observe the latest write sequence by
423441
// advancing the write sequence immediately.
@@ -426,8 +444,9 @@ process_delayed_frees(PyInterpreterState *interp)
426444
_Py_qsbr_quiescent_state(current_tstate->qsbr);
427445
HEAD_LOCK(&_PyRuntime);
428446
PyThreadState *tstate = interp->threads.head;
447+
429448
while (tstate != NULL) {
430-
_PyMem_ProcessDelayed(tstate);
449+
_PyMem_ProcessDelayedNoDealloc(tstate, queue_freed_object, state);
431450
tstate = (PyThreadState *)tstate->next;
432451
}
433452
HEAD_UNLOCK(&_PyRuntime);
@@ -1233,7 +1252,7 @@ gc_collect_internal(PyInterpreterState *interp, struct collection_state *state,
12331252
}
12341253
HEAD_UNLOCK(&_PyRuntime);
12351254

1236-
process_delayed_frees(interp);
1255+
process_delayed_frees(interp, state);
12371256

12381257
// Find unreachable objects
12391258
int err = deduce_unreachable_heap(interp, state);

0 commit comments

Comments
 (0)
0