8000 Merge pull request #1119 from datastax/python-1290 · datastax/python-driver@e4e290f · GitHub
[go: up one dir, main page]

Skip to content

Commit e4e290f

Browse files
authored
Merge pull request #1119 from datastax/python-1290
PYTHON-1290 Convert asyncio reactor away from @asyncio.coroutine
1 parent d3afae4 commit e4e290f

File tree

1 file changed

+13
-16
lines changed

1 file changed

+13
-16
lines changed

cassandra/io/asyncioreactor.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,8 @@ def __init__(self, timeout, callback, loop):
4646
self._handle = asyncio.run_coroutine_threadsafe(delayed, loop=loop)
4747

4848
@staticmethod
49-
@asyncio.coroutine
50-
def _call_delayed_coro(timeout, callback, loop):
51-
yield from asyncio.sleep(timeout, loop=loop)
49+
async def _call_delayed_coro(timeout, callback, loop):
50+
await asyncio.sleep(timeout, loop=loop)
5251
return callback()
5352

5453
def __lt__(self, other):
@@ -136,8 +135,7 @@ def close(self):
136135
self._close(), loop=self._loop
137136
)
138137

139-
@asyncio.coroutine
140-
def _close(self):
138+
async def _close(self):
141139
log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint))
142140
if self._write_watcher:
143141
self._write_watcher.cancel()
@@ -174,40 +172,39 @@ def push(self, data):
174172
# avoid races/hangs by just scheduling this, not using threadsafe
175173
self._loop.create_task(self._push_msg(chunks))
176174

177-
@asyncio.coroutine
178-
def _push_msg(self, chunks):
175+
async def _push_msg(self, chunks):
179176
# This lock ensures all chunks of a message are sequential in the Queue
180-
with (yield from self._write_queue_lock):
177+
with await self._write_queue_lock:
181178
for chunk in chunks:
182179
self._write_queue.put_nowait(chunk)
183180

184181

185-
@asyncio.coroutine
186-
def handle_write(self):
182+
async def handle_write(self):
187183
while True:
188184
try:
189-
next_msg = yield from self._write_queue.get()
185+
next_msg = await self._write_queue.get()
190186
if next_msg:
191-
yield from self._loop.sock_sendall(self._socket, next_msg)
187+
await self._loop.sock_sendall(self._socket, next_msg)
192188
except socket.error as err:
193189
log.debug("Exception in send for %s: %s", self, err)
194190
self.defunct(err)
195191
return
196192
except asyncio.CancelledError:
197193
return
198194

199-
@asyncio.coroutine
200-
def handle_read(self):
195+
async def handle_read(self):
201196
while True:
202197
try:
203-
buf = yield from self._loop.sock_recv(self._socket, self.in_buffer_size)
198+
buf = await self._loop.sock_recv(self._socket, self.in_buffer_size)
204199
self._iobuf.write(buf)
205200
# sock_recv expects EWOULDBLOCK if socket provides no data, but
206201
# nonblocking ssl sockets raise these instead, so we handle them
207202
# ourselves by yielding to the event loop, where the socket will
208203
# get the reading/writing it "wants" before retrying
209204
except (ssl.SSLWantWriteError, ssl.SSLWantReadError):
210-
yield
205+
# Apparently the preferred way to yield to the event loop from within
206+
# a native coroutine based on https://github.com/python/asyncio/issues/284
207+
await asyncio.sleep(0)
211208
continue
212209
except socket.error as err:
213210
log.debug("Exception during socket recv for %s: %s",

0 commit comments

Comments
 (0)
0