8000 streams.StreamReader: Use bytearray instead of deque of bytes for int… · python/asyncio@49ef421 · GitHub
[go: up one dir, main page]

Skip to content
This repository was archived by the owner on Nov 23, 2017. It is now read-only.

Commit 49ef421

Browse files
committed
streams.StreamReader: Use bytearray instead of deque of bytes for internal buffer
1 parent 46c9f5a commit 49ef421

File tree

2 files changed

+74
-76
lines changed

2 files changed

+74
-76
lines changed

asyncio/streams.py

Lines changed: 22 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
'open_connection', 'start_server', 'IncompleteReadError',
55
]
66

7-
import collections
8-
97
from . import events
108
from . import futures
119
from . import protocols
@@ -259,9 +257,7 @@ def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
259257
if loop is None:
260258
loop = events.get_event_loop()
261259
self._loop = loop
262-
# TODO: Use a bytearray for a buffer, like the transport.
263-
self._buffer = collections.deque() # Deque of bytes objects.
264-
self._byte_count = 0 # Bytes in buffer.
260+
self._buffer = bytearray()
265261
self._eof = False # Whether we're done.
266262
self._waiter = None # A future.
267263
self._exception = None
@@ -285,7 +281,7 @@ def set_transport(self, transport):
285281
self._transport = transport
286282

287283
def _maybe_resume_transport(self):
288-
if self._paused and self._byte_count <= self._limit:
284+
if self._paused and len(self._buffer) <= self._limit:
289285
self._paused = False
290286
self._transport.resume_reading()
291287

@@ -301,8 +297,7 @@ def feed_data(self, data):
301297
if not data:
302298
return
303299

304-
self._buffer.append(data)
305-
self._byte_count += len(data)
300+
self._buffer.extend(data)
306301

307302
waiter = self._waiter
308303
if waiter is not None:
@@ -312,7 +307,7 @@ def feed_data(self, data):
312307

313308
if (self._transport is not None and
314309
not self._paused and
315-
self._byte_count > 2*self._limit):
310+
len(self._buffer) > 2*self._limit):
316311
try:
317312
self._transport.pause_reading()
318313
except NotImplementedError:
@@ -338,28 +333,22 @@ def readline(self):
338333
if self._exception is not None:
339334
raise self._exception
340335

341-
parts = []
342-
parts_size = 0
336+
line = bytearray()
343337
not_enough = True
344338

345339
while not_enough:
346340
while self._buffer and not_enough:
347-
data = self._buffer.popleft()
348-
ichar = data.find(b'\n')
341+
ichar = self._buffer.find(b'\n')
349342
if ichar < 0:
350-
parts.append(data)
351-
parts_size += len(data)
343+
line.extend(self._buffer)
344+
self._buffer.clear()
352345
else:
353346
ichar += 1
354-
head, tail = data[:ichar], data[ichar:]
355-
if tail:
356-
self._buffer.appendleft(tail)
347+
line.extend(self._buffer[:ichar])
348+
del self._buffer[:ichar]
357349
not_enough = False
358-
parts.append(head)
359-
parts_size += len(head)
360350

361-
if parts_size > self._limit:
362-
self._byte_count -= parts_size
351+
if len(line) > self._limit:
363352
self._maybe_resume_transport()
364353
raise ValueError('Line is too long')
365354

@@ -373,11 +362,8 @@ def readline(self):
373362
finally:
374363
self._waiter = None
375364

376-
line = b''.join(parts)
377-
self._byte_count -= parts_size
378365
self._maybe_resume_transport()
379-
380-
return line
366+
return bytes(line)
381367

382368
@tasks.coroutine
383369
def read(self, n=-1):
@@ -395,36 +381,23 @@ def read(self, n=-1):
395381
finally:
396382
self._waiter = None
397383
else:
398-
if not self._byte_count and not self._eof:
384+
if not self._buffer and not self._eof:
399385
self._waiter = self._create_waiter('read')
400386
try:
401387
yield from self._waiter
402388
finally:
403389
self._waiter = None
404390

405-
if n < 0 or self._byte_count <= n:
406-
data = b''.join(self._buffer)
391+
if n < 0 or len(self._buffer) <= n:
392+
data = bytes(self._buffer)
407393
self._buffer.clear()
408-
self._byte_count = 0
409-
self._maybe_resume_transport()
410-
return data
411-
412-
parts = []
413-
parts_bytes = 0
414-
while self._buffer and parts_bytes < n:
415-
data = self._buffer.popleft()
416-
data_bytes = len(data)
417-
if n < parts_bytes + data_bytes:
418-
data_bytes = n - parts_bytes
419-
data, rest = data[:data_bytes], data[data_bytes:]
420-
self._buffer.appendleft(rest)
421-
422-
parts.append(data)
423-
parts_bytes += data_bytes
424-
self._byte_count -= data_bytes
425-
self._maybe_resume_transport()
426-
427-
return b''.join(parts)
394+
else:
395+
# n > 0 and len(self._buffer) > n
396+
data = bytes(self._buffer[:n])
397+
del self._buffer[:n]
398+
399+
self._maybe_resume_transport()
400+
return data
428401

429402
@tasks.coroutine
430403
def readexactly(self, n):

tests/test_streams.py

Lines changed: 52 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,13 @@ def test_feed_empty_data(self):
7979
stream = asyncio.StreamReader(loop=self.loop)
8080

8181
stream.feed_data(b'')
82-
self.assertEqual(0, stream._byte_count)
82+
self.assertEqual(b'', stream._buffer)
8383

84-
def test_feed_data_byte_count(self):
84+
def test_feed_nonempty_data(self):
8585
stream = asyncio.StreamReader(loop=self.loop)
8686

8787
stream.feed_data(self.DATA)
88-
self.assertEqual(len(self.DATA), stream._byte_count)
88+
self.assertEqual(self.DATA, stream._buffer)
8989

9090
def test_read_zero(self):
9191
# Read zero bytes.
@@ -94,7 +94,7 @@ def test_read_zero(self):
9494

9595
data = self.loop.run_until_complete(stream.read(0))
9696
self.assertEqual(b'', data)
97-
self.assertEqual(len(self.DATA), stream._byte_count)
97+
self.assertEqual(self.DATA, stream._buffer)
9898

9999
def test_read(self):
100100
# Read bytes.
@@ -107,7 +107,7 @@ def cb():
107107

108108
data = self.loop.run_until_complete(read_task)
109109
self.assertEqual(self.DATA, data)
110-
self.assertFalse(stream._byte_count)
110+
self.assertEqual(b'', stream._buffer)
111111

112112
def test_read_line_breaks(self):
113113
# Read bytes without line breaks.
@@ -118,7 +118,7 @@ def test_read_line_breaks(self):
118118
data = self.loop.run_until_complete(stream.read(5))
119119

120120
self.assertEqual(b'line1', data)
121-
self.assertEqual(5, stream._byte_count)
121+
self.assertEqual(b'line2', stream._buffer)
122122

123123
def test_read_eof(self):
124124
# Read bytes, stop at eof.
@@ -131,7 +131,7 @@ def cb():
131131

132132
data = self.loop.run_until_complete(read_task)
133133
self.assertEqual(b'', data)
134-
self.assertFalse(stream._byte_count)
134+
self.assertEqual(b'', stream._buffer)
135135

136136
def test_read_until_eof(self):
137137
# Read all bytes until eof.
@@ -147,7 +147,7 @@ def cb():
147147
data = self.loop.run_until_complete(read_task)
148148

149149
self.assertEqual(b'chunk1\nchunk2', data)
150-
self.assertFalse(stream._byte_count)
150+
self.assertEqual(b'', stream._buffer)
151151

152152
def test_read_exception(self):
153153
stream = asyncio.StreamReader(loop=self.loop)
@@ -161,7 +161,8 @@ def test_read_exception(self):
161161
ValueError, self.loop.run_until_complete, stream.read(2))
162162

163163
def test_readline(self):
164-
# Read one line.
164+
# Read one line. 'readline' will need to wait for the data
165+
# to come from 'cb'
165166
stream = asyncio.StreamReader(loop=self.loop)
166167
stream.feed_data(b'chunk1 ')
167168
read_task = asyncio.Task(stream.readline(), loop=self.loop)
@@ -174,30 +175,40 @@ def cb():
174175

175176
line = self.loop.run_until_complete(read_task)
176177
self.assertEqual(b'chunk1 chunk2 chunk3 \n', line)
177-
self.assertEqual(len(b'\n chunk4')-1, stream._byte_count)
F438 178+
self.assertEqual(b' chunk4', stream._buffer)
178179

179180
def test_readline_limit_with_existing_data(self):
180-
stream = asyncio.StreamReader(3, loop=self.loop)
181+
# Read one line. The data is in StreamReader's buffer
182+
# before the event loop is run.
183+
184+
stream = asyncio.StreamReader(limit=3, loop=self.loop)
181185
stream.feed_data(b'li')
182186
stream.feed_data(b'ne1\nline2\n')
183187

184188
self.assertRaises(
185189
ValueError, self.loop.run_until_complete, stream.readline())
186-
self.assertEqual([b'line2\n'], list(stream._buffer))
190+
# The buffer should contain the remaining data after exception
191+
self.assertEqual(b'line2\n', stream._buffer)
187192

188-
stream = asyncio.StreamReader(3, loop=self.loop)
193+
stream = asyncio.StreamReader(limit=3, loop=self.loop)
189194
stream.feed_data(b'li')
190195
stream.feed_data(b'ne1')
191196
stream.feed_data(b'li')
192197

193198
self.assertRaises(
194199
ValueError, self.loop.run_until_complete, stream.readline())
195-
self.assertEqual([b'li'], list(stream._buffer))
196-
self.assertEqual(2, stream._byte_count)
200+
# No b'\n' at the end. The 'limit' is set to 3. So before
201+
# waiting for the new data in buffer, 'readline' will consume
202+
# the entire buffer, and since the length of the consumed data
203+
# is more than 3, it will raise a ValudError. The buffer is
204+
# expected to be empty now.
205+
self.assertEqual(b'', stream._buffer)
197206

198207
def test_readline_limit(self):
199-
stream = asyncio.StreamReader(7, loop=self.loop)
208+
# Read one line. StreamReaders are fed with data after
209+
# their 'readline' methods are called.
200210

211+
stream = asyncio.StreamReader(limit=7, loop=self.loop)
201212
def cb():
202213
stream.feed_data(b'chunk1')
203214
stream.feed_data(b'chunk2')
@@ -207,18 +218,33 @@ def cb():
207218

208219
self.assertRaises(
209220
ValueError, self.loop.run_until_complete, stream.readline())
210-
self.assertEqual([b'chunk3\n'], list(stream._buffer))
211-
self.assertEqual(7, stream._byte_count)
221+
# The buffer had just one line of data, and after raising
222+
# a ValueError it should be empty.
223+
self.assertEqual(b'', stream._buffer)
224+
225+
stream = asyncio.StreamReader(limit=7, loop=self.loop)
226+
def cb():
227+
stream.feed_data(b'chunk1')
228+
stream.feed_data(b'chunk2\n')
229+
stream.feed_data(b'chunk3\n')
230+
stream.feed_eof()
231+
self.loop.call_soon(cb)
232+
233+
self.assertRaises(
234+
ValueError, self.loop.run_until_complete, stream.readline())
235+
self.assertEqual(b'chunk3\n', stream._buffer)
212236

213-
def test_readline_line_byte_count(self):
237+
def test_readline_nolimit_nowait(self):
238+
# All needed data for the first 'readline' call will be
239+
# in the buffer.
214240
stream = asyncio.StreamReader(loop=self.loop)
215241
stream.feed_data(self.DATA[:6])
216242
stream.feed_data(self.DATA[6:])
217243

218244
line = self.loop.run_until_complete(stream.readline())
219245

220246
self.assertEqual(b'line1\n', line)
221-
self.assertEqual(len(self.DATA) - len(b'line1\n'), stream._byte_count)
247+
self.assertEqual(b'line2\nline3\n', stream._buffer)
222248

223249
def test_readline_eof(self):
224250
stream = asyncio.StreamReader(loop=self.loop)
@@ -244,9 +270,7 @@ def test_readline_read_byte_count(self):
244270
data = self.loop.run_until_complete(stream.read(7))
245271

246272
self.assertEqual(b'line2\nl', data)
247-
self.assertEqual(
248-
len(self.DATA) - len(b'line1\n') - len(b'line2\nl'),
249-
stream._byte_count)
273+
self.assertEqual(b'ine3\n', stream._buffer)
250274

251275
def test_readline_exception(self):
252276
stream = asyncio.StreamReader(loop=self.loop)
@@ -258,6 +282,7 @@ def test_readline_exception(self):
258282
stream.set_exception(ValueError())
259283
self.assertRaises(
260284
ValueError, self.loop.run_until_complete, stream.readline())
285+
self.assertEqual(b'', stream._buffer)
261286

262287
def test_readexactly_zero_or_less(self):
263288
# Read exact number of bytes (zero or less).
@@ -266,11 +291,11 @@ def test_readexactly_zero_or_less(self):
266291

267292
data = self.loop.run_until_complete(stream.readexactly(0))
268293
self.assertEqual(b'', data)
269-
self.assertEqual(len(self.DATA), stream._byte_count)
294+
self.assertEqual(self.DATA, stream._buffer)
270295

271296
data = self.loop.run_until_complete(stream.readexactly(-1))
272297
self.assertEqual(b'', data)
273-
self.assertEqual(len(self.DATA), stream._byte_count)
298+
self.assertEqual(self.DATA, stream._buffer)
274299

275300
def test_readexactly(self):
276301
# Read exact number of bytes.
@@ -287,7 +312,7 @@ def cb():
287312

288313
data = self.loop.run_until_complete(read_task)
289314
self.assertEqual(self.DATA + self.DATA, data)
290-
self.assertEqual(len(self.DATA), stream._byte_count)
315+
self.assertEqual(self.DATA, stream._buffer)
291316

292317
def test_readexactly_eof(self):
293318
# Read exact number of bytes (eof).
@@ -306,7 +331,7 @@ def cb():
306331
self.assertEqual(cm.exception.expected, n)
307332
self.assertEqual(str(cm.exception),
308333
'18 bytes read on a total of 36 expected bytes')
309-
self.assertFalse(stream._byte_count)
334+
self.assertEqual(b'', stream._buffer)
310335

311336
def test_readexactly_exception(self):
312337
stream = asyncio.StreamReader(loop=self.loop)

0 commit comments

Comments
 (0)
0