8000 demonstrate blosc multi-processing issue · sailfish009/zarr-python@517f8f8 · GitHub
[go: up one dir, main page]

Skip to content

Commit 517f8f8

Browse files
committed
demonstrate blosc multi-processing issue
1 parent 7f259a8 commit 517f8f8

File tree

1 file changed

+229
-0
lines changed

1 file changed

+229
-0
lines changed

notebooks/blosc-multiprocessing.ipynb

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": 1,
6+
"metadata": {},
7+
"outputs": [],
8+
"source": [
9+
"import os"
10+
]
11+
},
12+
{
13+
"cell_type": "code",
14+
"execution_count": 2,
15+
"metadata": {},
16+
"outputs": [
17+
{
18+
"name": "stdout",
19+
"output_type": "stream",
20+
"text": [
21+
"parent process: 5710\n"
22+
]
23+
}
24+
],
25+
"source": [
26+
"print('parent process:', os.getpid())"
27+
]
28+
},
29+
{
30+
"cell_type": "code",
31+
"execution_count": 3,
32+
"metadata": {},
33+
"outputs": [],
34+
"source": [
35+
"import numpy as np"
36+
]
37+
},
38+
{
39+
"cell_type": "code",
40+
"execution_count": 4,
41+
"metadata": {},
42+
"outputs": [
43+
{
44+
"data": {
45+
"text/plain": [
46+
"'0.2.1'"
47+
]
48+
},
49+
"execution_count": 4,
50+
"metadata": {},
51+
"output_type": "execute_result"
52+
}
53+
],
54+
"source": [
55+
"import numcodecs\n",
56+
"numcodecs.__version__"
57+
]
58+
},
59+
{
60+
"cell_type": "code",
61+
"execution_count": 6,
62+
"metadata": {},
63+
"outputs": [
64+
{
65+
"data": {
66+
"text/plain": [
67+
"Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)"
68+
]
69+
},
70+
"execution_count": 6,
71+
"metadata": {},
72+
"output_type": "execute_result"
73+
}
74+
],
75+
"source": [
76+
"compressor = numcodecs.Blosc()\n",
77+
"compressor"
78+
]
79+
},
80+
{
81+
"cell_type": "code",
82+
"execution_count": 7,
83+
"metadata": {},
84+
"outputs": [],
85+
"source": [
86+
"data = np.arange(1000000)\n",
87+
"enc = compressor.encode(data)"
88+
]
89+
},
90+
{
91+
"cell_type": "code",
92+
"execution_count": 8,
93+
"metadata": {},
94+
"outputs": [],
95+
"source": [
96+
"def worker(task):\n",
97+
" pid = os.getpid()\n",
98+
" print('[%s:%s] begin' % (task, pid))\n",
99+
" dec = np.frombuffer(compressor.decode(enc), dtype=data.dtype)[task:task+10]\n",
100+
" print('[%s:%s] decoded %s' % (task, pid, dec))"
101+
]
102+
},
103+
{
104+
"cell_type": "code",
105+
"execution_count": 9,
106+
"metadata": {},
107+
"outputs": [
108+
{
109+
"name": "stdout",
110+
"output_type": "stream",
111+
"text": [
112+
"[0:5710] begin\n",
113+
"[0:5710] decoded [0 1 2 3 4 5 6 7 8 9]\n"
114+
]
115+
}
116+
],
117+
"source": [
118+
"worker(0)"
119+
]
120+
},
121+
{
122+
"cell_type": "code",
123+
"execution_count": 10,
124+
"metadata": {},
125+
"outputs": [
126+
{
127+
"name": "stdout",
128+
"output_type": "stream",
129+
"text": [
130+
"[3:5741] begin\n",
131+
"[2:5740] begin\n",
132+
"[1:5739] begin\n",
133+
"[4:5742] begin\n",
134+
"[0:5738] begin\n"
135+
]
136+
},
137+
{
138+
"ename": "KeyboardInterrupt",
139+
"evalue": "",
140+
"output_type": "error",
141+
"traceback": [
142+
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
143+
"\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)",
144+
"\u001b[0;32m<ipython-input-10-e06b05d04f67>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0;32mimport\u001b[0m \u001b[0mmultiprocessing\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2\u001b[0m \u001b[0mpool\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mmultiprocessing\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPool\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mprocesses\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m5\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 3\u001b[0;31m \u001b[0mpool\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmap\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mworker\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mrange\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m5\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
145+
"\u001b[0;32m/usr/lib/python3.6/multiprocessing/pool.py\u001b[0m in \u001b[0;36mmap\u001b[0;34m(self, func, iterable, chunksize)\u001b[0m\n\u001b[1;32m 258\u001b[0m \u001b[0;32min\u001b[0m \u001b[0ma\u001b[0m \u001b[0mlist\u001b[0m \u001b[0mthat\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0mreturned\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 259\u001b[0m '''\n\u001b[0;32m--> 260\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_map_async\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfunc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0miterable\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mmapstar\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mchunksize\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mget\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 261\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 262\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mstarmap\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0miterable\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mchunksize\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mNone\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
146+
"\u001b[0;32m/usr/lib/python3.6/multiprocessing/pool.py\u001b[0m in \u001b[0;36mget\u001b[0;34m(self, timeout)\u001b[0m\n\u001b[1;32m 600\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 601\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mget\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtimeout\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mNone\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 602\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwait\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtimeout\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 603\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mready\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 604\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mTimeoutError\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
147+
"\u001b[0;32m/usr/lib/python3.6/multiprocessing/pool.py\u001b[0m in \u001b[0;36mwait\u001b[0;34m(self, timeout)\u001b[0m\n\u001b[1;32m 597\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 598\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mwait\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtimeout\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mNone\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 599\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_event\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwait\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtimeout\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 600\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 601\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mget\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtimeout\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mNone\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
148+
"\u001b[0;32m/usr/lib/python3.6/threading.py\u001b[0m in \u001b[0;36mwait\u001b[0;34m(self, timeout)\u001b[0m\n\u001b[1;32m 549\u001b[0m \u001b[0msignaled\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_flag\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 550\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0msignaled\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 551\u001b[0;31m \u001b[0msignaled\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_cond\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwait\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtimeout\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 552\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0msignaled\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 553\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
149+
"\u001b[0;32m/usr/lib/python3.6/threading.py\u001b[0m in \u001b[0;36mwait\u001b[0;34m(self, timeout)\u001b[0m\n\u001b[1;32m 293\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;31m# restore state no matter what (e.g., KeyboardInterrupt)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 294\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mtimeout\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 295\u001b[0;31m \u001b[0mwaiter\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0macquire\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 296\u001b[0m \u001b[0mgotit\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mTrue\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 297\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
150+
"\u001b[0;31mKeyboardInterrupt\u001b[0m: "
151+
]
152+
}
153+
],
154+
"source": [
155+
"import multiprocessing\n",
156+
"pool = multiprocessing.Pool(processes=5)\n",
157+
"pool.map(worker, range(5))\n",
158+
"pool.close()\n",
159+
"pool.join()"
160+
]
161+
},
162+
{
163+
"cell_type": "code",
164+
"execution_count": 11,
165+
"metadata": {},
166+
"outputs": [],
167+
"source": [
168+
"numcodecs.blosc.use_threads = False"
169+
]
170+
},
171+
{
172+
"cell_type": "code",
173+
"execution_count": 12,
174+
"metadata": {},
175+
"outputs": [
176+
{
177+
"name": "stdout",
178+
"output_type": "stream",
179+
"text": [
180+
"[4:5820] begin\n",
181+
"[2:5818] begin\n",
182+
"[0:5816] begin\n",
183+
"[3:5819] begin\n",
184+
"[1:5817] begin\n",
185+
"[1:5817] decoded [ 1 2 3 4 5 6 7 8 9 10]\n",
186+
"[3:5819] decoded [ 3 4 5 6 7 8 9 10 11 12]\n",
187+
"[0:5816] decoded [0 1 2 3 4 5 6 7 8 9]\n",
188+
"[2:5818] decoded [ 2 3 4 5 6 7 8 9 10 11]\n",
189+
"[4:5820] decoded [ 4 5 6 7 8 9 10 11 12 13]\n"
190+
]
191+
}
192+
],
193+
"source": [
194+
"pool = multiprocessing.Pool(processes=5)\n",
195+
"pool.map(worker, range(5))\n",
196+
"pool.close()\n",
197+
"pool.join()"
198+
]
199+
},
200+
{
201+
"cell_type": "code",
202+
"execution_count": null,
203+
"metadata": {},
204+
"outputs": [],
205+
"source": []
206 B787 +
}
207+
],
208+
"metadata": {
209+
"kernelspec": {
210+
"display_name": "Python 3",
211+
"language": "python",
212+
"name": "python3"
213+
},
214+
"language_info": {
215+
"codemirror_mode": {
216+
"name": "ipython",
217+
"version": 3
218+
},
219+
"file_extension": ".py",
220+
"mimetype": "text/x-python",
221+
"name": "python",
222+
"nbconvert_exporter": "python",
223+
"pygments_lexer": "ipython3",
224+
"version": "3.6.1"
225+
}
226+
},
227+
"nbformat": 4,
228+
"nbformat_minor": 2
229+
}

0 commit comments

Comments
 (0)
0