8000 Add support for pickling. · python/cpython@2b891df · GitHub
[go: up one dir, main page]

Skip to content

Commit 2b891df

Browse files
Add support for pickling.
1 parent 681f316 commit 2b891df

File tree

2 files changed

+104
-39
lines changed

2 files changed

+104
-39
lines changed

Lib/test/support/interpreters/queues.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Cross-interpreter Queues High Level Module."""
22

3+
import pickle
34
import queue
45
import time
56
import weakref
@@ -47,6 +48,7 @@ def list_all():
4748

4849

4950
_SHARED_ONLY = 0
51+
_PICKLED = 1
5052

5153
_known_queues = weakref.WeakValueDictionary()
5254

@@ -106,18 +108,25 @@ def qsize(self):
106108
return _queues.get_count(self._id)
107109

108110
def put(self, obj, timeout=None, *,
111+
sharedonly=False,
109112
_delay=10 / 1000, # 10 milliseconds
110113
):
111114
"""Add the object to the queue.
112115
113116
This blocks while the queue is full.
117+
118+
If "sharedonly" is true then the object must be "shareable".
119+
It will be passed through the queue efficiently. Otherwise
120+
all objects are supported, at the expense of worse performance.
114121
"""
115-
fmt = _SHARED_ONLY
122+
fmt = _SHARED_ONLY if sharedonly else _PICKLED
116123
if timeout is not None:
117124
timeout = int(timeout)
118125
if timeout < 0:
119126
raise ValueError(f'timeout value must be non-negative')
120127
end = time.time() + timeout
128+
if fmt is _PICKLED:
129+
obj = pickle.dumps(obj)
121130
while True:
122131
try:
123132
_queues.put(self._id, obj, fmt)
@@ -129,10 +138,12 @@ def put(self, obj, timeout=None, *,
129138
else:
130139
break
131140

132-
def put_nowait(self, obj):
133-
fmt = _SHARED_ONLY
141+
def put_nowait(self, obj, *, sharedonly=False):
142+
fmt = _SHARED_ONLY if sharedonly else _PICKLED
143+
if fmt is _PICKLED:
144+
obj = pickle.dumps(obj)
134145
try:
135-
return _queues.put(self._id, obj, fmt)
146+
_queues.put(self._id, obj, fmt)
136147
except _queues.QueueFull as exc:
137148
exc.__class__ = QueueFull
138149
raise # re-raise
@@ -159,7 +170,10 @@ def get(self, timeout=None, *,
159170
time.sleep(_delay)
160171
else:
161172
break
162-
assert fmt == _SHARED_ONLY
173+
if fmt == _PICKLED:
174+
obj = pickle.loads(obj)
175+
else:
176+
assert fmt == _SHARED_ONLY
163177
return obj
164178

165179
def get_nowait(self):

Lib/test/test_interpreters/test_queues.py

Lines changed: 85 additions & 34 deletions
10000
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,13 @@ def test_shareable(self):
5858

5959
with self.subTest('same interpreter'):
6060
queue2 = queues.create()
61-
queue1.put(queue2)
61+
queue1.put(queue2, sharedonly=True)
6262
queue3 = queue1.get()
6363
self.assertIs(queue3, queue2)
6464

6565
with self.subTest('from current interpreter'):
6666
queue4 = queues.create()
67-
queue1.put(queue4)
67+
queue1.put(queue4, sharedonly=True)
6868
out = _run_output(interp, dedent("""
6969
queue4 = queue1.get()
7070
print(queue4.id)
@@ -75,7 +75,7 @@ def test_shareable(self):
7575
with self.subTest('from subinterpreter'):
7676
out = _run_output(interp, dedent("""
7777
queue5 = queues.create()
78-
queue1.put(queue5)
78+
queue1.put(queue5, sharedonly=True)
7979
print(queue5.id)
8080
"""))
8181
qid = int(out)
@@ -118,7 +118,7 @@ class TestQueueOps(TestBase):
118118
def test_empty(self):
119119
queue = queues.create()
120120
before = queue.empty()
121-
queue.put(None)
121+
queue.put(None, sharedonly=True)
122122
during = queue.empty()
123123
queue.get()
124124
after = queue.empty()
@@ -133,7 +133,7 @@ def test_full(self):
133133
queue = queues.create(3)
134134
for _ in range(3):
135135
actual.append(queue.full())
136-
queue.put(None)
136+
queue.put(None, sharedonly=True)
137137
actual.append(queue.full())
138138
for _ in range(3):
139139
queue.get()
@@ -147,16 +147,16 @@ def test_qsize(self):
147147
queue = queues.create()
148148
for _ in range(3):
149149
actual.append(queue.qsize())
150-
queue.put(None)
150+
queue.put(None, sharedonly=True)
151151
actual.append(queue.qsize())
152152
queue.get()
153153
actual.append(queue.qsize())
154-
queue.put(None)
154+
queue.put(None, sharedonly=True)
155155
actual.append(queue.qsize())
156156
for _ in range(3):
157157
queue.get()
158158
actual.append(queue.qsize())
159-
queue.put(None)
159+
queue.put(None, sharedonly=True)
160160
actual.append(queue.qsize())
161161
queue.get()
162162
actual.append(queue.qsize())
@@ -165,30 +165,81 @@ def test_qsize(self):
165165

166166
def test_put_get_main(self):
167167
expected = list(range(20))
168-
queue = queues.create()
169-
for i in range(20):
170-
queue.put(i)
171-
actual = [queue.get() for _ in range(20)]
168+
for sharedonly in (True, False):
169+
kwds = dict(sharedonly=sharedonly)
170+
with self.subTest(f'sharedonly={sharedonly}'):
171+
queue = queues.create()
172+
for i in range(20):
173+
queue.put(i, **kwds)
174+
actual = [queue.get() for _ in range(20)]
172175

173-
self.assertEqual(actual, expected)
176+
self.assertEqual(actual, expected)
174177

175178
def test_put_timeout(self):
176-
queue = queues.create(2)
177-
queue.put(None)
178-
queue.put(None)
179-
with self.assertRaises(queues.QueueFull):
180-
queue.put(None, timeout=0.1)
181-
queue.get()
182-
queue.put(None)
179+
for sharedonly in (True, False):
180+
kwds = dict(sharedonly=sharedonly)
181+
with self.subTest(f'sharedonly={sharedonly}'):
182+
queue = queues.create(2)
183+
queue.put(None, **kwds)
184+
queue.put(None, **kwds)
185+
with self.assertRaises(queues.QueueFull):
186+
queue.put(None, timeout=0.1, **kwds)
187+
queue.get()
188+
queue.put(None, **kwds)
183189

184190
def test_put_nowait(self):
185-
queue = queues.create(2)
186-
queue.put_nowait(None)
187-
queue.put_nowait(None)
188-
with self.assertRaises(queues.QueueFull):
189-
queue.put_nowait(None)
190-
queue.get()
191-
queue.put_nowait(None)
191+
for sharedonly in (True, False):
192+
kwds = dict(sharedonly=sharedonly)
193+
with self.subTest(f'sharedonly={sharedonly}'):
194+
queue = queues.create(2)
195+
queue.put_nowait(None, **kwds)
196+
queue.put_nowait(None, **kwds)
197+
with self.assertRaises(queues.QueueFull):
198+
queue.put_nowait(None, **kwds)
199+
queue.get()
200+
queue.put_nowait(None, **kwds)
201+
202+
def test_put_sharedonly(self):
203+
for obj in [
204+
None,
205+
True,
206+
10,
207+
'spam',
208+
b'spam',
209+
(0, 'a'),
210+
]:
211+
with self.subTest(repr(obj)):
212+
queue = queues.create()
213+
queue.put(obj, sharedonly=True)
214+
obj2 = queue.get()
215+
self.assertEqual(obj2, obj)
216+
217+
for obj in [
218+
[1, 2, 3],
219+
{'a': 13, 'b': 17},
220+
]:
221+
with self.subTest(repr(obj)):
222+
queue = queues.create()
223+
with self.assertRaises(interpreters.NotShareableError):
224+
queue.put(obj, sharedonly=True)
225+
226+
def test_put_not_sharedonly(self):
227+
for obj in [
228+
None,
229+
True,
230+
10,
231+
'spam',
232+
b'spam',
233+
(0, 'a'),
234+
# not shareable
235+
[1, 2, 3],
236+
{'a': 13, 'b': 17},
237+
]:
238+
with self.subTest(repr(obj)):
239+
queue = queues.create()
240+
queue.put(obj, sharedonly=False)
241+
obj2 = queue.get()
242+
self.assertEqual(obj2, obj)
192243

193244
def test_get_timeout(self):
194245
queue = queues.create()
@@ -206,7 +257,7 @@ def test_put_get_same_interpreter(self):
206257
from test.support.interpreters import queues
207258
queue = queues.create()
208259
orig = b'spam'
209-
queue.put(orig)
260+
queue.put(orig, sharedonly=True)
210261
obj = queue.get()
211262
assert obj == orig, 'expected: obj == orig'
212263
assert obj is not orig, 'expected: obj is not orig'
@@ -219,7 +270,7 @@ def test_put_get_different_interpreters(self):
219270
self.assertEqual(len(queues.list_all()), 2)
220271

221272
obj1 = b'spam'
222-
queue1.put(obj1)
273+
queue1.put(obj1, sharedonly=True)
223274

224275
out = _run_output(
225276
interp,
@@ -236,7 +287,7 @@ def test_put_get_different_interpreters(self):
236287
obj2 = b'eggs'
237288
print(id(obj2))
238289
assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0'
239-
queue2.put(obj2)
290+
queue2.put(obj2, sharedonly=True)
240291
assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1'
241292
"""))
242293
self.assertEqual(len(queues.list_all()), 2)
@@ -258,8 +309,8 @@ def test_put_cleared_with_subinterpreter(self):
258309
queue = queues.Queue({queue.id})
259310
obj1 = b'spam'
260311
obj2 = b'eggs'
261-
queue.put(obj1)
262-
queue.put(obj2)
312+
queue.put(obj1, sharedonly=True)
313+
queue.put(obj2, sharedonly=True)
263314
"""))
264315
self.assertEqual(queue.qsize(), 2)
265316

@@ -281,12 +332,12 @@ def f():
281332
break
282333
except queues.QueueEmpty:
283334
continue
284-
queue2.put(obj)
335+
queue2.put(obj, sharedonly=True)
285336
t = threading.Thread(target=f)
286337
t.start()
287338

288339
orig = b'spam'
289-
queue1.put(orig)
340+
queue1.put(orig, sharedonly=True)
290341
obj = queue2.get()
291342
t.join()
292343

0 commit comments

Comments
 (0)
0