8000 Add a BufferedReader and allow BufferedWriter to handle partial writes and errors after some data was written. by klondi · Pull Request #13390 · micropython/micropython · GitHub
[go: up one dir, main page]

Skip to content

Add a BufferedReader and allow BufferedWriter to handle partial writes and errors after some data was written. #13390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ports/esp32/mpconfigport.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
#define MICROPY_PY_ALL_INPLACE_SPECIAL_METHODS (1)
#define MICROPY_PY_BUILTINS_HELP_TEXT esp32_help_text
#define MICROPY_PY_IO_BUFFEREDWRITER (1)
#define MICROPY_PY_IO_BUFFEREDREADER (1)
#define MICROPY_PY_TIME_GMTIME_LOCALTIME_MKTIME (1)
#define MICROPY_PY_TIME_TIME_TIME_NS (1)
#define MICROPY_PY_TIME_INCLUDEFILE "ports/esp32/modtime.c"
Expand Down
189 changes: 158 additions & 31 deletions py/modio.c
6D40
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include <assert.h>
#include <string.h>

#include "py/mpconfig.h"
#include "py/misc.h"
#include "py/runtime.h"
#include "py/builtin.h"
#include "py/stream.h"
Expand Down Expand Up @@ -107,12 +109,15 @@

#endif // MICROPY_PY_IO_IOBASE

#if MICROPY_PY_IO_BUFFEREDWRITER
#if MICROPY_PY_IO_BUFFEREDWRITER || MICROPY_PY_IO_BUFFEREDREADER

// The structure and the new method are shared by reader and writer
typedef struct _mp_obj_bufwriter_t {
mp_obj_base_t base;
mp_obj_t stream;
size_t alloc;
size_t len;
int error;
byte buf[0];
} mp_obj_bufwriter_t;

Expand All @@ -123,61 +128,100 @@
o->stream = args[0];
o->alloc = alloc;
o->len = 0;
o->error = 0;
return o;
}

#endif // MICROPY_PY_IO_BUFFEREDWRITER || MICROPY_PY_IO_BUFFEREDREADER

#if MICROPY_PY_IO_BUFFEREDWRITER
// Writes out the data stored in the buffer so far
static int bufwriter_do_write(mp_obj_bufwriter_t *self) {
int rv = 0;
// This cannot return 0 without an error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on what you mean with this? Like: why would it be an issue for this bit of code if it were 0?

mp_uint_t out_sz = mp_stream_write_exactly(self->stream, self->buf, self->len, &rv);
self->len -= out_sz;
// Copy the non written characters back to the beginning
if (self->len != 0) {
// Use memmove since there might be overlaps
memmove(self->buf, self->buf + out_sz, self->len);
}
return rv;
}

static mp_uint_t bufwriter_write(mp_obj_t self_in, const void *buf, mp_uint_t size, int *errcode) {
mp_obj_bufwriter_t *self = MP_OBJ_TO_PTR(self_in);

mp_uint_t org_size = size;
// Alloc should always remain the same so cache it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but here the sentence ends with a dot (imo best), but not the other comments. Also the comment below is after its statement instead of on the line above it.

size_t alloc = self->alloc;
mp_uint_t rem = 0; // No data has been copied from the buffer so far

while (size > 0) {
mp_uint_t rem = self->alloc - self->len;
if (size < rem) {
memcpy(self->buf + self->len, buf, size);
self->len += size;
return org_size;
}
if (self->error != 0) {
*errcode = self->error;
self->error = 0;
return MP_STREAM_ERROR;

Check warning on line 163 in py/modio.c

View check run for this annotation

Codecov / codecov/patch

py/modio.c#L161-L163

Added lines #L161 - L163 were not covered by tests
}

// Using this form allows ensuring that we try to empty the buffer if it is
// full without trying to extract any data first.
while (true) {
// Buffer flushing policy here is to flush entire buffer all the time.
// This allows e.g. to have a block device as backing storage and write
// entire block to it. memcpy below is not ideal and could be optimized
// in some cases. But the way it is now it at least ensures that buffer
// is word-aligned, to guard against obscure cases when it matters, e.g.
// https://github.com/micropython/micropython/issues/1863
// Try to empty the buffer first
if (self->len == alloc) {
if ((*errcode = bufwriter_do_write(self)) != 0) {
// If this is the first write with data from the new buffer
// But no data from the new buffer was written out,
// then remove the data added from the new buffer and raise
// the error
if (org_size == size + rem && self->len >= rem) {
// Remove the extra non-written data from the buffer and error
self->len -= rem;
return MP_STREAM_ERROR;
}
// Some data from the new buffer has been written rollback as much
// as possible and return what was written then raise an error
// on the next call.
size += self->len;
self->len = 0;
self->error = *errcode;
*errcode = 0;
return org_size - size;

Check warning on line 194 in py/modio.c

View check run for this annotation

Codecov / codecov/patch

py/modio.c#L190-L194

Added lines #L190 - L194 were not covered by tests
}
}
// No data left to write
if (size == 0) {
return org_size;
}

rem = MIN(alloc - self->len, size);
memcpy(self->buf + self->len, buf, rem);
self->len += rem;
buf = (byte *)buf + rem;
size -= rem;
mp_uint_t out_sz = mp_stream_write_exactly(self->stream, self->buf, self->alloc, errcode);
(void)out_sz;
if (*errcode != 0) {
return MP_STREAM_ERROR;
}
// TODO: try to recover from a case of non-blocking stream, e.g. move
// remaining chunk to the beginning of buffer.
assert(out_sz == self->alloc);
self->len = 0;
}

return org_size;
}

static mp_obj_t bufwriter_flush(mp_obj_t self_in) {
mp_obj_bufwriter_t *self = MP_OBJ_TO_PTR(self_in);

if (self->len != 0) {
int err;
mp_uint_t out_sz = mp_stream_write_exactly(self->stream, self->buf, self->len, &err);
(void)out_sz;
// TODO: try to recover from a case of non-blocking stream, e.g. move
// remaining chunk to the beginning of buffer.
assert(out_sz == self->len);
self->len = 0;
if (err != 0) {
mp_raise_OSError(err);
int err = self->error;
if (err == 0 && self->len != 0) {
err = bufwriter_do_write(self);
// If we couldn't flush the whole buffer notify the user.
if (err == 0 && self->len != 0) {
err = MP_EAGAIN;
}
}

// If there is an error raise it
if (err != 0) {
self->error = 0;
mp_raise_OSError(err);

Check warning on line 223 in py/modio.c

View check run for this annotation

Codecov / codecov/patch

py/modio.c#L222-L223

Added lines #L222 - L223 were not covered by tests
}
return mp_const_none;
}
static MP_DEFINE_CONST_FUN_OBJ_1(bufwriter_flush_obj, bufwriter_flush);
Expand All @@ -200,8 +244,88 @@
protocol, &bufwriter_stream_p,
locals_dict, &bufwriter_locals_dict
);

#endif // MICROPY_PY_IO_BUFFEREDWRITER

#if MICROPY_PY_IO_BUFFEREDREADER

static mp_uint_t bufreader_read(mp_obj_t self_in, void *buf, mp_uint_t size, int *errcode) {
mp_obj_bufwriter_t *self = MP_OBJ_TO_PTR(self_in);

Check warning on line 253 in py/modio.c

View check run for this annotation

Codecov / codecov/patch

py/modio.c#L253

Added line #L253 was not covered by tests

mp_uint_t org_size = size;

Check warning on line 255 in py/modio.c

View check run for this annotation

Codecov / codecov/patch

py/modio.c#L255

Added line #L255 was not covered by tests
// Cache this value since it should not change thus avoiding a dereference
size_t alloc = self->alloc;

Check warning on line 257 in py/modio.c

View check run for this annotation

Codecov / codecov/patch

py/modio.c#L257

Added line #L257 was not covered by tests

*errcode = self->error;
self->error = 0;

Check warning on line 260 in py/modio.c

View check run for this annotation

Codecov / codecov/patch

py/modio.c#L259-L260

Added lines #L259 - L260 were not covered by tests

while (*errcode == 0 && size > 0) {

Check warning on line 262 in py/modio.c

View check run for this annotation

Codecov / codecov/patch

py/modio.c#L262

Added line #L262 was not covered by tests
// Buffer filling policy here is to fill the entire buffer all the time.
// This allows e.g. to have a block device as backing storage and read
// entire blocks from it. memcpy below is not ideal and could be
// optimized in some cases. But the way it is now it at least ensures
// that buffer is word-aligned, to guard against obscure cases when it
// matters, e.g.
// https://github.com/micropython/micropython/issues/1863

// Buffer needs to have at least one byte
if (self->len == 0) {
self->len = mp_stream_read_exactly(self->stream, self->buf, alloc, errcode);
if (self->len < alloc) {

Check warning on line 274 in py/modio.c

View check run for this annotation

Codecov / codecov/patch

py/modio.c#L272-L274

Added lines #L272 - L274 were not covered by tests
// If no data was read stop the loop as it is either an error or an EOF
// This check is moved here because it speeds up the more common case
// where the buffer was completely filled.
if (self->len == 0) {

Check warning on line 278 in py/modio.c

View check run for this annotation

Codecov / codecov/patch

py/modio.c#L278

Added line #L278 was not covered by tests
break;
}
// Buffers may overlap, therefore we use memmove instead of memcpy
memmove(self->buf + (alloc - self->len), self->buf, self->len);

Check warning on line 282 in py/modio.c

View check run for this annotation

Codecov / codecov/patch

py/modio.c#L282

Added line #L282 was not covered by tests
}
}
mp_uint_t rem = MIN(self->len, size);
memcpy(buf, self->buf + (alloc - self->len), rem);
self->len -= rem;
buf = (byte *)buf + rem;
size -= rem;

Check warning on line 289 in py/modio.c

View check run for this annotation

Codecov / codecov/patch

py/modio.c#L285-L289

Added lines #L285 - L289 were not covered by tests
}

// If there is a prior error
if (*errcode != 0) {

Check warning on line 293 in py/modio.c

View check run for this annotation

Codecov / codecov/patch

py/modio.c#L293

Added line #L293 was not covered by tests
// If no data was read return it
if (size == org_size) {

Check warning on line 295 in py/modio.c

View check run for this annotation

Codecov / codecov/patch

py/modio.c#L295

Added line #L295 was not covered by tests
return MP_STREAM_ERROR;
}
// Otherwise save it so we can fail on the next call
self->error = *errcode;
*errcode = 0;

Check warning on line 300 in py/modio.c

View check run for this annotation

Codecov / codecov/patch

py/modio.c#L299-L300

Added lines #L299 - L300 were not covered by tests
}

// Return the data that has been read so far
return org_size - size;

Check warning on line 304 in py/modio.c

View check run for this annotation

Codecov / codecov/patch

py/modio.c#L304

Added line #L304 was not covered by tests
}

static const mp_rom_map_elem_t bufreader_locals_dict_table[] = {
{ MP_ROM_QSTR(MP_QSTR_read), MP_ROM_PTR(&mp_stream_read_obj) },
{ MP_ROM_QSTR(MP_QSTR_readinto), MP_ROM_PTR(&mp_stream_readinto_obj) },
{ MP_ROM_QSTR(MP_QSTR_readline), MP_ROM_PTR(&mp_stream_unbuffered_readline_obj) },
};
static MP_DEFINE_CONST_DICT(bufreader_locals_dict, bufreader_locals_dict_table);

static const mp_stream_p_t bufreader_stream_p = {
.read = bufreader_read,
};

static MP_DEFINE_CONST_OBJ_TYPE(
mp_type_bufreader,
MP_QSTR_BufferedReader,
MP_TYPE_FLAG_NONE,
make_new, bufwriter_make_new,
protocol, &bufreader_stream_p,
locals_dict, &bufreader_locals_dict
);

#endif // MICROPY_PY_IO_BUFFEREDREADER

static const mp_rom_map_elem_t mp_module_io_globals_table[] = {
{ MP_ROM_QSTR(MP_QSTR___name__), MP_ROM_QSTR(MP_QSTR_io) },
// Note: mp_builtin_open_obj should be defined by port, it's not
Expand All @@ -217,6 +341,9 @@
#if MICROPY_PY_IO_BUFFEREDWRITER
{ MP_ROM_QSTR(MP_QSTR_BufferedWriter), MP_ROM_PTR(&mp_type_bufwriter) },
#endif
#if MICROPY_PY_IO_BUFFEREDREADER
{ MP_ROM_QSTR(MP_QSTR_BufferedReader), MP_ROM_PTR(&mp_type_bufreader) },
#endif
};

static MP_DEFINE_CONST_DICT(mp_module_io_globals, mp_module_io_globals_table);
Expand Down
5 changes: 5 additions & 0 deletions py/mpconfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,11 @@ typedef double mp_float_t;
#define MICROPY_PY_IO_BUFFEREDWRITER (MICROPY_CONFIG_ROM_LEVEL_AT_LEAST_EVERYTHING)
#endif

// Whether to provide "io.BufferedReader" class
#ifndef MICROPY_PY_IO_BUFFEREDREADER
#define MICROPY_PY_IO_BUFFEREDREADER (MICROPY_CONFIG_ROM_LEVEL_AT_LEAST_EVERYTHING)
#endif

// Whether to provide "struct" module
#ifndef MICROPY_PY_STRUCT
#define MICROPY_PY_STRUCT (MICROPY_CONFIG_ROM_LEVEL_AT_LEAST_CORE_FEATURES)
Expand Down
Loading
0