-
-
Notifications
You must be signed in to change notification settings - Fork 357
Closed
Description
I'm trying to implement an async version of zip
using the following code, but I'm getting an error when one of the iterators passed to async_zip
raises a StopAsyncIteration
before the others are finished. Normally I'd expect that I can catch it using the multierror or a try/except, however, instead I'm getting a RuntimeError
.
I don't understand why this is happening and it's not clear if this is a bug in my code or in trio.
import trio
class async_zip(object):
def __init__(self, *largs):
self.nexts = [obj.__anext__ for obj in largs]
async def _accumulate(self, f, items, i):
items[i] = await f()
def __aiter__(self):
return self
async def __anext__(self):
nexts = self.nexts
items = [None, ] * len(nexts)
got_stop = False
def handle(exc):
nonlocal got_stop
if isinstance(exc, StopAsyncIteration):
got_stop = True
return None
else:
return exc
with trio.MultiError.catch(handle):
async with trio.open_nursery() as nursery:
for i, f in enumerate(nexts):
nursery.start_soon(self._accumulate, f, items, i)
if got_stop:
raise StopAsyncIteration
return items
# an async iterable
class it(object):
def __init__(self, count):
self.count = count
self.val = 0
def __aiter__(self):
return self
async def __anext__(self):
await trio.sleep(1)
val = self.val
if val >= self.count:
raise StopAsyncIteration
self.val += 1
return val
# test the iterable
async def runner():
async for val in it(4):
print('got', val)
# let all iterables finish together
async def zipper():
async for vals in async_zip(it(4), it(4), it(4), it(4)):
print('got', vals)
# have one iterable finish early
async def zipper_short():
async for vals in async_zip(it(4), it(4), it(4), it(2)):
print('got', vals)
# run them
trio.run(runner)
trio.run(zipper)
trio.run(zipper_short)
This prints
got 0
got 1
got 2
got 3
got [0, 0, 0, 0]
got [1, 1, 1, 1]
got [2, 2, 2, 2]
got [3, 3, 3, 3]
got [0, 0, 0, 0]
got [1, 1, 1, 1]
Traceback (most recent call last):
File "E:\Python\Python35-x64\lib\site-packages\async_generator\impl.py", line 199, in _invoke
result = fn(*args)
File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 320, in open_nursery
await nursery._clean_up(pending_exc)
File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 519, in _clean_up
raise mexc
File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 1502, in run_impl
msg = task.coro.send(next_send)
File "<ipython-input-2-e43ada780e94>", line 9, in _accumulate
items[i] = await f()
File "<ipython-input-2-e43ada780e94>", line 46, in __anext__
raise StopAsyncIteration
StopAsyncIteration
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "E:\Python\Python35-x64\lib\site-packages\IPython\core\interactiveshell.py", line 2881, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-2-e43ada780e94>", line 64, in <module>
trio.run(zipper_short)
File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 1397, in run
return result.unwrap()
File "E:\Python\Python35-x64\lib\contextlib.py", line 77, in __exit__
self.gen.throw(type, value, traceback)
File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_ki.py", line 194, in ki_manager
yield
File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 1397, in run
return result.unwrap()
File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_result.py", line 119, in unwrap
raise self.error
File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 1502, in run_impl
msg = task.coro.send(next_send)
File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 1085, in init
return system_nursery._reap_and_unwrap(task)
File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 447, in _reap_and_unwrap
return task._result.unwrap()
File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_result.py", line 119, in unwrap
raise self.error
File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 1502, in run_impl
msg = task.coro.send(next_send)
File "<ipython-input-2-e43ada780e94>", line 59, in zipper_short
async for vals in async_zip(it(4), it(4), it(4), it(2)):
File "<ipython-input-2-e43ada780e94>", line 30, in __anext__
nursery.start_soon(self._accumulate, f, items, i)
File "E:\Python\Python35-x64\lib\site-packages\trio\_util.py", line 109, in __aexit__
await self._agen.asend(None)
File "E:\Python\Python35-x64\lib\site-packages\async_generator\impl.py", line 261, in asend
return await self._do_it(self._it.send, value)
File "E:\Python\Python35-x64\lib\site-packages\async_generator\impl.py", line 277, in _do_it
return await ANextIter(self._it, start_fn, *args)
File "E:\Python\Python35-x64\lib\site-packages\async_generator\impl.py", line 192, in send
return self._invoke(self._it.send, value)
File "E:\Python\Python35-x64\lib\site-packages\async_generator\impl.py", line 211, in _invoke
) from e
RuntimeError: async_generator raise StopAsyncIteration
Metadata
Metadata
Assignees
Labels
No labels