From a50c07bd228366ba029222c90e93e7e6c3b046fc Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Sat, 7 Sep 2024 23:20:44 +0200 Subject: [PATCH 1/7] Make concurrent iteration over pairwise safe under free-threading --- Modules/itertoolsmodule.c | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/Modules/itertoolsmodule.c b/Modules/itertoolsmodule.c index e740ec4d7625c3..1533288293d935 100644 --- a/Modules/itertoolsmodule.c +++ b/Modules/itertoolsmodule.c @@ -262,6 +262,9 @@ typedef struct { PyObject_HEAD PyObject *it; PyObject *old; +#ifdef Py_GIL_DISABLED + int iterator_exhausted; +#endif PyObject *result; } pairwiseobject; @@ -294,6 +297,9 @@ pairwise_new_impl(PyTypeObject *type, PyObject *iterable) } po->it = it; po->old = NULL; +#ifdef Py_GIL_DISABLED + po->iterator_exhausted = 0; +#endif po->result = PyTuple_Pack(2, Py_None, Py_None); if (po->result == NULL) { Py_DECREF(po); @@ -327,18 +333,29 @@ pairwise_traverse(pairwiseobject *po, visitproc visit, void *arg) static PyObject * pairwise_next(pairwiseobject *po) { - PyObject *it = po->it; - PyObject *old = po->old; + PyObject *it = FT_ATOMIC_LOAD_PTR(po->it); + PyObject *old = FT_ATOMIC_LOAD_PTR(po->old); PyObject *new, *result; + result = FT_ATOMIC_LOAD_PTR(po->result); +#ifndef Py_GIL_DISABLED if (it == NULL) { return NULL; } +#else + if (_Py_atomic_load_int_relaxed(&po->iterator_exhausted)) { + return NULL; + } +#endif if (old == NULL) { old = (*Py_TYPE(it)->tp_iternext)(it); Py_XSETREF(po->old, old); if (old == NULL) { +#ifndef Py_GIL_DISABLED Py_CLEAR(po->it); +#else + _Py_atomic_store_int_relaxed(&po->iterator_exhausted, 1); +#endif return NULL; } it = po->it; @@ -350,14 +367,18 @@ pairwise_next(pairwiseobject *po) Py_INCREF(old); new = (*Py_TYPE(it)->tp_iternext)(it); if (new == NULL) { +#ifndef Py_GIL_DISABLED Py_CLEAR(po->it); +#else + _Py_atomic_store_int_relaxed(&po->iterator_exhausted, 1); +#endif Py_CLEAR(po->old); Py_DECREF(old); return NULL; } - result = po->result; - if (Py_REFCNT(result) == 1) { + assert(result != NULL); + if (_PyObject_IsUniquelyReferenced(result)) { Py_INCREF(result); PyObject *last_old = PyTuple_GET_ITEM(result, 0); PyObject *last_new = PyTuple_GET_ITEM(result, 1); @@ -379,7 +400,7 @@ pairwise_next(pairwiseobject *po) } } - Py_XSETREF(po->old, new); + Py_XSETREF(po->old, new); // this should be atomic in the FT build Py_DECREF(old); return result; } From eed6486f1bf6a4058ec94f60b6f396a98972e2d2 Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Sun, 8 Sep 2024 21:00:09 +0200 Subject: [PATCH 2/7] make setting po->old atomic --- Modules/itertoolsmodule.c | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/Modules/itertoolsmodule.c b/Modules/itertoolsmodule.c index 1533288293d935..943d0604954038 100644 --- a/Modules/itertoolsmodule.c +++ b/Modules/itertoolsmodule.c @@ -350,12 +350,9 @@ pairwise_next(pairwiseobject *po) if (old == NULL) { old = (*Py_TYPE(it)->tp_iternext)(it); Py_XSETREF(po->old, old); - if (old == NULL) { #ifndef Py_GIL_DISABLED + if (old == NULL) { Py_CLEAR(po->it); -#else - _Py_atomic_store_int_relaxed(&po->iterator_exhausted, 1); -#endif return NULL; } it = po->it; @@ -363,16 +360,24 @@ pairwise_next(pairwiseobject *po) Py_CLEAR(po->old); return NULL; } +#else + if (old == NULL) { + _Py_atomic_store_int_relaxed(&po->iterator_exhausted, 1); + return NULL; + } +#endif } Py_INCREF(old); new = (*Py_TYPE(it)->tp_iternext)(it); if (new == NULL) { #ifndef Py_GIL_DISABLED Py_CLEAR(po->it); + Py_CLEAR(po->old); #else _Py_atomic_store_int_relaxed(&po->iterator_exhausted, 1); + PyObject *po_old = ( PyObject *)_Py_atomic_exchange_ptr(&po->old, 0); + Py_XDECREF(po_old); #endif - Py_CLEAR(po->old); Py_DECREF(old); return NULL; } @@ -400,7 +405,14 @@ pairwise_next(pairwiseobject *po) } } - Py_XSETREF(po->old, new); // this should be atomic in the FT build +#ifndef Py_GIL_DISABLED + Py_XSETREF(po->old, new); +#else + // this should be atomic in the FT build + PyObject *po_old = ( PyObject *)_Py_atomic_exchange_ptr(&po->old, new); + Py_XDECREF(po_old); +#endif + Py_DECREF(old); return result; } From 8493433ffe6ae6da0a15c58f9d4121f6bc4fdf5e Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Sun, 8 Sep 2024 22:33:24 +0200 Subject: [PATCH 3/7] atomic exchanges --- Lib/test/test_itertools.py | 26 +------------------------- Modules/itertoolsmodule.c | 28 +++++++++++++++++++--------- 2 files changed, 20 insertions(+), 34 deletions(-) diff --git a/Lib/test/test_itertools.py b/Lib/test/test_itertools.py index 9c0c4b4de18cf1..bdd979c02526cf 100644 --- a/Lib/test/test_itertools.py +++ b/Lib/test/test_itertools.py @@ -902,35 +902,11 @@ def __next__(self): (([2], [3]), [4]), ([4], [5]), ]) - check({2}, [ - ([1], ([1], [3])), - (([1], [3]), [4]), - ([4], [5]), - ]) - check({3}, [ - ([1], [2]), - ([2], ([2], [4])), - (([2], [4]), [5]), - ([5], [6]), - ]) check({1, 2}, [ ((([3], [4]), [5]), [6]), ([6], [7]), ]) - check({1, 3}, [ - (([2], ([2], [4])), [5]), - ([5], [6]), - ]) - check({1, 4}, [ - (([2], [3]), (([2], [3]), [5])), - ((([2], [3]), [5]), [6]), - ([6], [7]), - ]) - check({2, 3}, [ - ([1], ([1], ([1], [4]))), - (([1], ([1], [4])), [5]), - ([5], [6]), - ]) + def test_pairwise_reenter2(self): def check(maxcount, expected): diff --git a/Modules/itertoolsmodule.c b/Modules/itertoolsmodule.c index 943d0604954038..231e1d4f3bd5c7 100644 --- a/Modules/itertoolsmodule.c +++ b/Modules/itertoolsmodule.c @@ -349,8 +349,8 @@ pairwise_next(pairwiseobject *po) #endif if (old == NULL) { old = (*Py_TYPE(it)->tp_iternext)(it); - Py_XSETREF(po->old, old); #ifndef Py_GIL_DISABLED + Py_XSETREF(po->old, old); if (old == NULL) { Py_CLEAR(po->it); return NULL; @@ -365,22 +365,34 @@ pairwise_next(pairwiseobject *po) _Py_atomic_store_int_relaxed(&po->iterator_exhausted, 1); return NULL; } + PyObject *po_old = ( PyObject *)_Py_atomic_exchange_ptr(&po->old, old); + // we expect po_old to be zero, but it can have been set by + // a concurrent thread + Py_XDECREF(po_old); #endif } + +#ifndef Py_GIL_DISABLED Py_INCREF(old); new = (*Py_TYPE(it)->tp_iternext)(it); if (new == NULL) { -#ifndef Py_GIL_DISABLED Py_CLEAR(po->it); Py_CLEAR(po->old); + Py_DECREF(old); + return NULL; + } #else + // at this stage we know that po->old has been set, but we have to make + // sure that po->old is valid at every moment so we atomically swap old + // and new. for that we first need to acquire a new object + new = (*Py_TYPE(it)->tp_iternext)(it); + if (new == NULL) { _Py_atomic_store_int_relaxed(&po->iterator_exhausted, 1); - PyObject *po_old = ( PyObject *)_Py_atomic_exchange_ptr(&po->old, 0); - Py_XDECREF(po_old); -#endif - Py_DECREF(old); return NULL; } + old = ( PyObject *)_Py_atomic_exchange_ptr(&po->old, new); + // we have acquired old and we hold a reference to it +#endif assert(result != NULL); if (_PyObject_IsUniquelyReferenced(result)) { @@ -408,9 +420,7 @@ pairwise_next(pairwiseobject *po) #ifndef Py_GIL_DISABLED Py_XSETREF(po->old, new); #else - // this should be atomic in the FT build - PyObject *po_old = ( PyObject *)_Py_atomic_exchange_ptr(&po->old, new); - Py_XDECREF(po_old); + // update was already done #endif Py_DECREF(old); From 00f3e343405bde3c40325d37b81706ef48dcb1c0 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Sun, 8 Sep 2024 20:59:16 +0000 Subject: [PATCH 4/7] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../2024-09-08-20-59-13.gh-issue-123471.eXM6Ak.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Core_and_Builtins/2024-09-08-20-59-13.gh-issue-123471.eXM6Ak.rst diff --git a/Misc/NEWS.d/next/Core_and_Builtins/2024-09-08-20-59-13.gh-issue-123471.eXM6Ak.rst b/Misc/NEWS.d/next/Core_and_Builtins/2024-09-08-20-59-13.gh-issue-123471.eXM6Ak.rst new file mode 100644 index 00000000000000..a9d6b529b0d8a5 --- /dev/null +++ b/Misc/NEWS.d/next/Core_and_Builtins/2024-09-08-20-59-13.gh-issue-123471.eXM6Ak.rst @@ -0,0 +1 @@ +Make concurrent iterations over the same :func:`pairwise` iterator safe under free-threading. From f93d93a093dc714f8033d4e6cb34e246d04dec45 Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Sun, 8 Sep 2024 22:59:31 +0200 Subject: [PATCH 5/7] add missing incref/decref pair --- Modules/itertoolsmodule.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Modules/itertoolsmodule.c b/Modules/itertoolsmodule.c index 231e1d4f3bd5c7..2c90f19b15aef8 100644 --- a/Modules/itertoolsmodule.c +++ b/Modules/itertoolsmodule.c @@ -390,6 +390,8 @@ pairwise_next(pairwiseobject *po) _Py_atomic_store_int_relaxed(&po->iterator_exhausted, 1); return NULL; } + // we need to incref new before handing it over to po->old + Py_INCREF(new); old = ( PyObject *)_Py_atomic_exchange_ptr(&po->old, new); // we have acquired old and we hold a reference to it #endif @@ -420,7 +422,10 @@ pairwise_next(pairwiseobject *po) #ifndef Py_GIL_DISABLED Py_XSETREF(po->old, new); #else - // update was already done + // update to old was already done, but we still have to decref new + // note: we can avoid the incref/decref on new, but this would duplicate a + // bit more code from the normal and free-threading build + Py_DECREF(new); #endif Py_DECREF(old); From 82e0489d84637e157507924d4d9e1618334482ec Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Sun, 8 Sep 2024 23:03:29 +0200 Subject: [PATCH 6/7] fix whatsnew; optimize incref on new --- ...-09-08-20-59-13.gh-issue-123471.eXM6Ak.rst | 2 +- Modules/itertoolsmodule.c | 41 ++++++++++++++----- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/Misc/NEWS.d/next/Core_and_Builtins/2024-09-08-20-59-13.gh-issue-123471.eXM6Ak.rst b/Misc/NEWS.d/next/Core_and_Builtins/2024-09-08-20-59-13.gh-issue-123471.eXM6Ak.rst index a9d6b529b0d8a5..a6c66e04d47516 100644 --- a/Misc/NEWS.d/next/Core_and_Builtins/2024-09-08-20-59-13.gh-issue-123471.eXM6Ak.rst +++ b/Misc/NEWS.d/next/Core_and_Builtins/2024-09-08-20-59-13.gh-issue-123471.eXM6Ak.rst @@ -1 +1 @@ -Make concurrent iterations over the same :func:`pairwise` iterator safe under free-threading. +Make concurrent iterations over the same :func:`itertools.pairwise` iterator safe under free-threading. diff --git a/Modules/itertoolsmodule.c b/Modules/itertoolsmodule.c index 2c90f19b15aef8..64b2a39a3bb365 100644 --- a/Modules/itertoolsmodule.c +++ b/Modules/itertoolsmodule.c @@ -381,6 +381,31 @@ pairwise_next(pairwiseobject *po) Py_DECREF(old); return NULL; } + + assert(result != NULL); + if (_PyObject_IsUniquelyReferenced(result)) { + Py_INCREF(result); + PyObject *last_old = PyTuple_GET_ITEM(result, 0); + PyObject *last_new = PyTuple_GET_ITEM(result, 1); + PyTuple_SET_ITEM(result, 0, Py_NewRef(old)); + PyTuple_SET_ITEM(result, 1, Py_NewRef(new)); + Py_DECREF(last_old); + Py_DECREF(last_new); + // bpo-42536: The GC may have untracked this result tuple. Since we're + // recycling it, make sure it's tracked again: + if (!_PyObject_GC_IS_TRACKED(result)) { + _PyObject_GC_TRACK(result); + } + } + else { + result = PyTuple_New(2); + if (result != NULL) { + PyTuple_SET_ITEM(result, 0, Py_NewRef(old)); + PyTuple_SET_ITEM(result, 1, Py_NewRef(new)); + } + } + Py_XSETREF(po->old, new); + #else // at this stage we know that po->old has been set, but we have to make // sure that po->old is valid at every moment so we atomically swap old @@ -394,7 +419,6 @@ pairwise_next(pairwiseobject *po) Py_INCREF(new); old = ( PyObject *)_Py_atomic_exchange_ptr(&po->old, new); // we have acquired old and we hold a reference to it -#endif assert(result != NULL); if (_PyObject_IsUniquelyReferenced(result)) { @@ -402,7 +426,7 @@ pairwise_next(pairwiseobject *po) PyObject *last_old = PyTuple_GET_ITEM(result, 0); PyObject *last_new = PyTuple_GET_ITEM(result, 1); PyTuple_SET_ITEM(result, 0, Py_NewRef(old)); - PyTuple_SET_ITEM(result, 1, Py_NewRef(new)); + PyTuple_SET_ITEM(result, 1, new); // steal reference Py_DECREF(last_old); Py_DECREF(last_new); // bpo-42536: The GC may have untracked this result tuple. Since we're @@ -415,17 +439,12 @@ pairwise_next(pairwiseobject *po) result = PyTuple_New(2); if (result != NULL) { PyTuple_SET_ITEM(result, 0, Py_NewRef(old)); - PyTuple_SET_ITEM(result, 1, Py_NewRef(new)); + PyTuple_SET_ITEM(result, 1, new); // steal reference + } + else { + Py_DECREF(new); } } - -#ifndef Py_GIL_DISABLED - Py_XSETREF(po->old, new); -#else - // update to old was already done, but we still have to decref new - // note: we can avoid the incref/decref on new, but this would duplicate a - // bit more code from the normal and free-threading build - Py_DECREF(new); #endif Py_DECREF(old); From 8e4d77e90ee1d261019fd8f899305d0f0859ac8b Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Mon, 9 Sep 2024 18:54:17 +0200 Subject: [PATCH 7/7] add missing tests --- .../test_free_threading/test_itertools.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 Lib/test/test_free_threading/test_itertools.py diff --git a/Lib/test/test_free_threading/test_itertools.py b/Lib/test/test_free_threading/test_itertools.py new file mode 100644 index 00000000000000..44d962e1a0c5fb --- /dev/null +++ b/Lib/test/test_free_threading/test_itertools.py @@ -0,0 +1,42 @@ +import unittest +from threading import Thread + +from test.support import threading_helper + +from itertools import pairwise + +class PairwiseThreading(unittest.TestCase): + @staticmethod + def work(enum): + while True: + try: + next(enum) + except StopIteration: + break + + @threading_helper.reap_threads + @threading_helper.requires_working_threading() + def test_pairwise(self): + number_of_threads = 8 + number_of_iterations = 40 + n = 200 + enum = pairwise(range(n)) + for _ in range(number_of_iterations): + worker_threads = [] + for ii in range(number_of_threads): + worker_threads.append( + Thread( + target=self.work, + args=[ + enum, + ], + ) + ) + for t in worker_threads: + t.start() + for t in worker_threads: + t.join() + + +if __name__ == "__main__": + unittest.main()