8000 extract late loop handling logic from _ActorFuture into _LateLoopEvent · dask/distributed@95f11e9 · GitHub
[go: up one dir, main page]

Skip to content

Commit 95f11e9

Browse files
committed
extract late loop handling logic from _ActorFuture into _LateLoopEvent
this keeps the somewhat odd late asyncio.Event() construction in one location that can be removed by pyupgrade
1 parent aba3c7c commit 95f11e9

File tree

1 file changed

+30
-8
lines changed

1 file changed

+30
-8
lines changed

distributed/actor.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import abc
22
import asyncio
33
import functools
4+
import sys
45
import threading
56

67
from .client import Future
@@ -9,6 +10,33 @@
910
from .utils_comm import WrappedKey
1011
from .worker import get_client, get_worker
1112

13+
if sys.version_info >= (3, 10):
14+
from asyncio import Event as _LateLoopEvent
15+
else:
16+
# In python 3.10 asyncio.Lock and other primitives no longer support
17+
# passing a loop kwarg to bind to a loop running in another thread
18+
# e.g. calling from Client(asynchronous=False). Instead the loop is bound
19+
# as late as possible: when calling any methods that wait on or wake
20+
# Future instances. See: https://bugs.python.org/issue42392
21+
class _LateLoopEvent:
22+
def __init__(self):
23+
self._event = None
24+
25+
def set(self):
26+
if self._event is None:
27+
self._event = asyncio.Event()
28+
29+
self._event.set()
30+
31+
def is_set(self):
32+
return self._event is not None and self._event.is_set()
33+
34+
async def wait(self):
35+
if self._event is None:
36+
self._event = asyncio.Event()
37+
38+
return await self._event.wait()
39+
1240

1341
class Actor(WrappedKey):
1442
"""Controls an object on a remote worker
@@ -259,29 +287,23 @@ def done(self):
259287
class _ActorFuture(ActorFuture):
260288
def __init__(self, io_loop):
261289
self._io_loop = io_loop
262-
self._event = None
290+
self._event = _LateLoopEvent()
263291
self._out = None
264292

265293
def __await__(self):
266294
return self._result().__await__()
267295

268296
def done(self):
269-
return self._event and self._event.is_set()
297+
return self._event.is_set()
270298

271299
async def _result(self):
272-
if self._event is None:
273-
self._event = asyncio.Event()
274-
275300
await self._event.wait()
276301
out = self._out
277302
if out["status"] == "OK":
278303
return out["result"]
279304
raise out["exception"]
280305

281306
def _set_result(self, out):
282-
if self._event is None:
283-
self._event = asyncio.Event()
284-
285307
self._out = out
286308
self._event.set()
287309

0 commit comments

Comments
 (0)
0