8000 Set a default fmt on Queue objects. · python/cpython@1e6b6d2 · GitHub
[go: up one dir, main page]

Skip to content

Commit 1e6b6d2

Browse files
Set a default fmt on Queue objects.
1 parent 2b891df commit 1e6b6d2

File tree

3 files changed

+112
-30
lines changed

3 files changed

+112
-30
lines changed

Lib/test/support/interpreters/queues.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,40 +32,46 @@ class QueueFull(_queues.QueueFull, queue.Full):
3232
"""
3333

3434

35-
def create(maxsize=0):
35+
_SHARED_ONLY = 0
36+
_PICKLED = 1
37+
38+
def create(maxsize=0, *, sharedonly=False):
3639
"""Return a new cross-interpreter queue.
3740
3841
The queue may be used to pass data safely between interpreters.
42+
43+
"sharedonly" sets the default for Queue.put() and Queue.put_nowait().
3944
"""
40-
qid = _queues.create(maxsize)
41-
return Queue(qid)
45+
fmt = _SHARED_ONLY if sharedonly else _PICKLED
46+
qid = _queues.create(maxsize, fmt)
47+
return Queue(qid, _fmt=fmt)
4248

4349

4450
def list_all():
4551
"""Return a list of all open queues."""
46-
return [Queue(qid)
47-
for qid in _queues.list_all()]
48-
52+
return [Queue(qid, _fmt=fmt)
53+
for qid, fmt in _queues.list_all()]
4954

50-
_SHARED_ONLY = 0
51-
_PICKLED = 1
5255

5356
_known_queues = weakref.WeakValueDictionary()
5457

5558
class Queue:
5659
"""A cross-interpreter queue."""
5760

58-
def __new__(cls, id, /):
61+
def __new__(cls, id, /, *, _fmt=None):
5962
# There is only one instance for any given ID.
6063
if isinstance(id, int):
6164
id = int(id)
6265
else:
6366
raise TypeError(f'id must be an int, got {id!r}')
67+
if _fmt is None:
68+
_fmt = _queues.get_default_fmt(id)
6469
try:
6570
self = _known_queues[id]
6671
except KeyError:
6772
self = super().__new__(cls)
6873
self._id = id
74+
self._fmt = _fmt
6975
_known_queues[id] = self
7076
_queues.bind(id)
7177
return self
@@ -108,18 +114,22 @@ def qsize(self):
108114
return _queues.get_count(self._id)
109115

110116
def put(self, obj, timeout=None, *,
111-
sharedonly=False,
117+
sharedonly=None,
112118
_delay=10 / 1000, # 10 milliseconds
113119
):
114120
"""Add the object to the queue.
115121
116122
This blocks while the queue is full.
117123
118124
If "sharedonly" is true then the object must be "shareable".
119-
It will be passed through the queue efficiently. Otherwise
125+
It will be passed through the queue efficiently. If false then
120126
all objects are supported, at the expense of worse performance.
127+
If None (the default) then it uses the queue's default.
121128
"""
122-
fmt = _SHARED_ONLY if sharedonly else _PICKLED
129+
if sharedonly is None:
130+
fmt = self._fmt
131+
else:
132+
fmt = _SHARED_ONLY if sharedonly else _PICKLED
123133
if timeout is not None:
124134
timeout = int(timeout)
125135
if timeout < 0:
@@ -138,7 +148,11 @@ def put(self, obj, timeout=None, *,
138148
else:
139149
break
140150

141-
def put_nowait(self, obj, *, sharedonly=False):
151+
def put_nowait(self, obj, *, sharedonly=None):
152+
if sharedonly is None:
153+
fmt = self._fmt
154+
else:
155+
fmt = _SHARED_ONLY if sharedonly else _PICKLED
142156
fmt = _SHARED_ONLY if sharedonly else _PICKLED
143157
if fmt is _PICKLED:
144158
obj = pickle.dumps(obj)

Lib/test/test_interpreters/test_queues.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,34 @@ def test_get_nowait(self):
251251
with self.assertRaises(queues.QueueEmpty):
252252
queue.get_nowait()
253253

254+
def test_put_get_default_sharedonly(self):
255+
expected = list(range(20))
256+
queue = queues.create(sharedonly=True)
257+
for i in range(20):
258+
queue.put(i)
259+
actual = [queue.get() for _ in range(20)]
260+
261+
self.assertEqual(actual, expected)
262+
263+
obj = [1, 2, 3] # lists are not shareable
264+
with self.assertRaises(interpreters.NotShareableError):
265+
queue.put(obj)
266+
267+
def test_put_get_default_not_sharedonly(self):
268+
expected = list(range(20))
269+
queue = queues.create(sharedonly=False)
270+
for i in range(20):
271+
queue.put(i)
272+
actual = [queue.get() for _ in range(20)]
273+
274+
self.assertEqual(actual, expected)
275+
276+
obj = [1, 2, 3] # lists are not shareable
277+
queue.put(obj)
278+
obj2 = queue.get()
279+
self.assertEqual(obj, obj2)
280+
self.assertIsNot(obj, obj2)
281+
254282
def test_put_get_same_interpreter(self):
255283
interp = interpreters.create()
256284
interp.exec_sync(dedent("""

Modules/_xxinterpqueuesmodule.c

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -400,10 +400,11 @@ typedef struct _queue {
400400
_queueitem *first;
401401
_queueitem *last;
402402
} items;
403+
int fmt;
403404
} _queue;
404405

405406
static int
406-
_queue_init(_queue *queue, Py_ssize_t maxsize)
407+
_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt)
407408
{
408409
PyThread_type_lock mutex = PyThread_allocate_lock();
409410
if (mutex == NULL) {
@@ -415,6 +416,7 @@ _queue_init(_queue *queue, Py_ssize_t maxsize)
415416
.items = {
416417
.maxsize = maxsize,
417418
},
419+
.fmt = fmt,
418420
};
419421
return 0;
420422
}
@@ -851,18 +853,26 @@ _queues_decref(_queues *queues, int64_t qid)
851853
PyThread_release_lock(queues->mutex);
852854
}
853855

854-
static int64_t *
856+
struct queue_id_and_fmt {
857+
int64_t id;
858+
int fmt;
859+
};
860+
861+
static struct queue_id_and_fmt *
855862
_queues_list_all(_queues *queues, int64_t *count)
856863
{
857-
int64_t *qids = NULL;
864+
struct queue_id_and_fmt *qids = NULL;
858865
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
859-
int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(queues->count));
866+
struct queue_id_and_fmt *ids = PyMem_NEW(struct queue_id_and_fmt,
867+
(Py_ssize_t)(queues->count));
860868
if (ids == NULL) {
861869
goto done;
862870
}
863871
_queueref *ref = queues->head;
864872
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
865-
ids[i] = ref->qid;
873+
ids[i].id = ref->qid;
874+
assert(ref->queue != NULL);
875+
ids[i].fmt = ref->queue->fmt;
866876
}
867877
*count = queues->count;
868878

@@ -898,13 +908,13 @@ _queue_free(_queue *queue)
898908

899909
// Create a new queue.
900910
static int64_t
901-
queue_create(_queues *queues, Py_ssize_t maxsize)
911+
queue_create(_queues *queues, Py_ssize_t maxsize, int fmt)
902912
{
903913
_queue *queue = GLOBAL_MALLOC(_queue);
904914
if (queue == NULL) {
905915
return ERR_QUEUE_ALLOC;
906916
}
907-
int err = _queue_init(queue, maxsize);
917+
int err = _queue_init(queue, maxsize, fmt);
908918
if (err < 0) {
909919
GLOBAL_FREE(queue);
910920
return (int64_t)err;
@@ -1275,14 +1285,15 @@ qidarg_converter(PyObject *arg, void *ptr)
12751285
static PyObject *
12761286
queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
12771287
{
1278-
static char *kwlist[] = {"maxsize", NULL};
1279-
Py_ssize_t maxsize = -1;
1280-
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|n:create", kwlist,
1281-
&maxsize)) {
1288+
static char *kwlist[] = {"maxsize", "fmt", NULL};
1289+
Py_ssize_t maxsize;
1290+
int fmt;
1291+
if (!PyArg_ParseTupleAndKeywords(args, kwds, "ni:create", kwlist,
1292+
&maxsize, &fmt)) {
12821293
return NULL;
12831294
}
12841295

1285-
int64_t qid = queue_create(&_globals.queues, maxsize);
1296+
int64_t qid = queue_create(&_globals.queues, maxsize, fmt);
12861297
if (qid < 0) {
12871298
(void)handle_queue_error((int)qid, self, qid);
12881299
return NULL;
@@ -1337,7 +1348,7 @@ static PyObject *
13371348
queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
13381349
{
13391350
int64_t count = 0;
1340-
int64_t *qids = _queues_list_all(&_globals.queues, &count);
1351+
struct queue_id_and_fmt *qids = _queues_list_all(&_globals.queues, &count);
13411352
if (qids == NULL) {
13421353
if (count == 0) {
13431354
return PyList_New(0);
@@ -1348,14 +1359,14 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
13481359
if (ids == NULL) {
13491360
goto finally;
13501361
}
1351-
int64_t *cur = qids;
1362+
struct queue_id_and_fmt *cur = qids;
13521363
for (int64_t i=0; i < count; cur++, i++) {
1353-
PyObject *qidobj = PyLong_FromLongLong(*cur);
1354-
if (qidobj == NULL) {
1364+
PyObject *item = Py_BuildValue("Li", cur->id, cur->fmt);
1365+
if (item == NULL) {
13551366
Py_SETREF(ids, NULL);
13561367
break;
13571368
}
1358-
PyList_SET_ITEM(ids, (Py_ssize_t)i, qidobj);
1369+
PyList_SET_ITEM(ids, (Py_ssize_t)i, item);
13591370
}
13601371

13611372
finally:
@@ -1512,6 +1523,33 @@ PyDoc_STRVAR(queuesmod_get_maxsize_doc,
15121523
\n\
15131524
Return the maximum number of items in the queue.");
15141525

1526+
static PyObject *
1527+
queuesmod_get_default_fmt(PyObject *self, PyObject *args, PyObject *kwds)
1528+
{
1529+
static char *kwlist[] = {"qid", NULL};
1530+
qidarg_converter_data qidarg;
1531+
if (!PyArg_ParseTupleAndKeywords(args, kwds,
1532+
"O&:get_default_fmt", kwlist,
1533+
qidarg_converter, &qidarg)) {
1534+
return NULL;
1535+
}
1536+
int64_t qid = qidarg.id;
1537+
1538+
_queue *queue = NULL;
1539+
int err = _queues_lookup(&_globals.queues, qid, &queue);
1540+
if (handle_queue_error(err, self, qid)) {
1541+
return NULL;
1542+
}
1543+
int fmt = queue->fmt;
1544+
_queue_unmark_waiter(queue, _globals.queues.mutex);
1545+
return PyLong_FromLong(fmt);
1546+
}
1547+
1548+
PyDoc_STRVAR(queuesmod_get_default_fmt_doc,
1549+
"get_default_fmt(qid)\n\
1550+
\n\
1551+
Return the default format to use for the queue.");
1552+
15151553
static PyObject *
15161554
queuesmod_is_full(PyObject *self, PyObject *args, PyObject *kwds)
15171555
{
@@ -1606,6 +1644,8 @@ static PyMethodDef module_functions[] = {
16061644
METH_VARARGS | METH_KEYWORDS, queuesmod_release_doc},
16071645
{"get_maxsize", _PyCFunction_CAST(queuesmod_get_maxsize),
16081646
METH_VARARGS | METH_KEYWORDS, queuesmod_get_maxsize_doc},
1647+
{"get_default_fmt", _PyCFunction_CAST(queuesmod_get_default_fmt),
1648+
METH_VARARGS | METH_KEYWORDS, queuesmod_get_default_fmt_doc},
16091649
{"is_full", _PyCFunction_CAST(queuesmod_is_full),
16101650
METH_VARARGS | METH_KEYWORDS, queuesmod_is_full_doc},
16111651
{"get_count", _PyCFunction_CAST(queuesmod_get_count),

0 commit comments

Comments
 (0)
0