|
1 | 1 | import abc
|
2 | 2 | import asyncio
|
3 | 3 | import functools
|
| 4 | +import sys |
4 | 5 | import threading
|
5 | 6 |
|
6 | 7 | from .client import Future
|
|
9 | 10 | from .utils_comm import WrappedKey
|
10 | 11 | from .worker import get_client, get_worker
|
11 | 12 |
|
| 13 | +if sys.version_info >= (3, 10): |
| 14 | + from asyncio import Event as _LateLoopEvent |
| 15 | +else: |
| 16 | + # In python 3.10 asyncio.Lock and other primitives no longer support |
| 17 | + # passing a loop kwarg to bind to a loop running in another thread |
| 18 | + # e.g. calling from Client(asynchronous=False). Instead the loop is bound |
| 19 | + # as late as possible: when calling any methods that wait on or wake |
| 20 | + # Future instances. See: https://bugs.python.org/issue42392 |
| 21 | + class _LateLoopEvent: |
| 22 | + def __init__(self): |
| 23 | + self._event = None |
| 24 | + |
| 25 | + def set(self): |
| 26 | + if self._event is None: |
| 27 | + self._event = asyncio.Event() |
| 28 | + |
| 29 | + self._event.set() |
| 30 | + |
| 31 | + def is_set(self): |
| 32 | + return self._event is not None and self._event.is_set() |
| 33 | + |
| 34 | + async def wait(self): |
| 35 | + if self._event is None: |
| 36 | + self._event = asyncio.Event() |
| 37 | + |
| 38 | + return await self._event.wait() |
| 39 | + |
12 | 40 |
|
13 | 41 | class Actor(WrappedKey):
|
14 | 42 | """Controls an object on a remote worker
|
@@ -259,29 +287,23 @@ def done(self):
|
259 | 287 | class _ActorFuture(ActorFuture):
|
260 | 288 | def __init__(self, io_loop):
|
261 | 289 | self._io_loop = io_loop
|
262 |
| - self._event = None |
| 290 | + self._event = _LateLoopEvent() |
263 | 291 | self._out = None
|
264 | 292 |
|
265 | 293 | def __await__(self):
|
266 | 294 | return self._result().__await__()
|
267 | 295 |
|
268 | 296 | def done(self):
|
269 |
| - return self._event and self._event.is_set() |
| 297 | + return self._event.is_set() |
270 | 298 |
|
271 | 299 | async def _result(self):
|
272 |
| - if self._event is None: |
273 |
| - self._event = asyncio.Event() |
274 |
| - |
275 | 300 | await self._event.wait()
|
276 | 301 | out = self._out
|
277 | 302 | if out["status"] == "OK":
|
278 | 303 | return out["result"]
|
279 | 304 | raise out["exception"]
|
280 | 305 |
|
281 | 306 | def _set_result(self, out):
|
282 |
| - if self._event is None: |
283 |
| - self._event = asyncio.Event() |
284 |
| - |
285 | 307 | self._out = out
|
286 | 308 | self._event.set()
|
287 | 309 |
|
|
0 commit comments