From 92aaf70b8e3e3afe545952e3e6e79d674efe4f85 Mon Sep 17 00:00:00 2001 From: "Francisco Blas (klondike) Izquierdo Riera" Date: Sun, 7 Jan 2024 23:04:33 +0100 Subject: [PATCH 1/2] py/modio: Handle partial writes on BufferedWriter. To simplify the logic create bufwriter_do_write. This function keeps track of the data left on self->len and, if a partial write happens, this function uses memmove to move the remaining data back to the beginning of the buffer. This allows simplifying bufwriter_write and bufwriter_flush significantly. bufwriter_flush now only needs to call this function if the buffer has some data stored and check for any error codes. Additionally, if the buffer is only partially flushed we notify the user too (so that they know there might be data left). bufwriter_write now just needs to call this function whenever the buffer gets full and copy the input into the buffer. It will return when either no data is written at all or when all of the input is consumed. In the case of a partial write it returns exactly the amount of data which was written. Additionally allow caching of errors to better handle partial writes. Until now if an error occurred during the write, the error would be raised and the caller had no way to know if any data was written at all (for example in prior calls if more than one block of data was passed as input). Now when we have written out some data and an error happens, we reset the buffer to the state it would have if it did not contain the data that was not written (and which was not buffered previously), and then, we return the data that was written (if any) or raise an error if no data from the input was written. This allows the programmer better control of writes. In particular, the programmer will know exactly how much of its last input data was written, consequently allowing it to handle whatever data left to be written in a better way. Signed-off-by: Francisco Blas (klondike) Izquierdo Riera --- py/modio.c | 99 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 69 insertions(+), 30 deletions(-) diff --git a/py/modio.c b/py/modio.c index d3e563dbcf447..c3960448e275a 100644 --- a/py/modio.c +++ b/py/modio.c @@ -27,6 +27,8 @@ #include #include +#include "py/mpconfig.h" +#include "py/misc.h" #include "py/runtime.h" #include "py/builtin.h" #include "py/stream.h" @@ -113,6 +115,7 @@ typedef struct _mp_obj_bufwriter_t { mp_obj_t stream; size_t alloc; size_t len; + int error; byte buf[0]; } mp_obj_bufwriter_t; @@ -123,61 +126,97 @@ static mp_obj_t bufwriter_make_new(const mp_obj_type_t *type, size_t n_args, siz o->stream = args[0]; o->alloc = alloc; o->len = 0; + o->error = 0; return o; } +// Writes out the data stored in the buffer so far +static int bufwriter_do_write(mp_obj_bufwriter_t *self) { + int rv = 0; + // This cannot return 0 without an error + mp_uint_t out_sz = mp_stream_write_exactly(self->stream, self->buf, self->len, &rv); + self->len -= out_sz; + // Copy the non written characters back to the beginning + if (self->len != 0) { + // Use memmove since there might be overlaps + memmove(self->buf, self->buf + out_sz, self->len); + } + return rv; +} + static mp_uint_t bufwriter_write(mp_obj_t self_in, const void *buf, mp_uint_t size, int *errcode) { mp_obj_bufwriter_t *self = MP_OBJ_TO_PTR(self_in); mp_uint_t org_size = size; + // Alloc should always remain the same so cache it. + size_t alloc = self->alloc; + mp_uint_t rem = 0; // No data has been copied from the buffer so far - while (size > 0) { - mp_uint_t rem = self->alloc - self->len; - if (size < rem) { - memcpy(self->buf + self->len, buf, size); - self->len += size; - return org_size; - } + if (self->error != 0) { + *errcode = self->error; + self->error = 0; + return MP_STREAM_ERROR; + } + // Using this form allows ensuring that we try to empty the buffer if it is + // full without trying to extract any data first. + while (true) { // Buffer flushing policy here is to flush entire buffer all the time. // This allows e.g. to have a block device as backing storage and write // entire block to it. memcpy below is not ideal and could be optimized // in some cases. But the way it is now it at least ensures that buffer // is word-aligned, to guard against obscure cases when it matters, e.g. // https://github.com/micropython/micropython/issues/1863 + // Try to empty the buffer first + if (self->len == alloc) { + if ((*errcode = bufwriter_do_write(self)) != 0) { + // If this is the first write with data from the new buffer + // But no data from the new buffer was written out, + // then remove the data added from the new buffer and raise + // the error + if (org_size == size + rem && self->len >= rem) { + // Remove the extra non-written data from the buffer and error + self->len -= rem; + return MP_STREAM_ERROR; + } + // Some data from the new buffer has been written rollback as much + // as possible and return what was written then raise an error + // on the next call. + size += self->len; + self->len = 0; + self->error = *errcode; + *errcode = 0; + return org_size - size; + } + } + // No data left to write + if (size == 0) { + return org_size; + } + + rem = MIN(alloc - self->len, size); memcpy(self->buf + self->len, buf, rem); + self->len += rem; buf = (byte *)buf + rem; size -= rem; - mp_uint_t out_sz = mp_stream_write_exactly(self->stream, self->buf, self->alloc, errcode); - (void)out_sz; - if (*errcode != 0) { - return MP_STREAM_ERROR; - } - // TODO: try to recover from a case of non-blocking stream, e.g. move - // remaining chunk to the beginning of buffer. - assert(out_sz == self->alloc); - self->len = 0; } - - return org_size; } static mp_obj_t bufwriter_flush(mp_obj_t self_in) { mp_obj_bufwriter_t *self = MP_OBJ_TO_PTR(self_in); - - if (self->len != 0) { - int err; - mp_uint_t out_sz = mp_stream_write_exactly(self->stream, self->buf, self->len, &err); - (void)out_sz; - // TODO: try to recover from a case of non-blocking stream, e.g. move - // remaining chunk to the beginning of buffer. - assert(out_sz == self->len); - self->len = 0; - if (err != 0) { - mp_raise_OSError(err); + int err = self->error; + if (err == 0 && self->len != 0) { + err = bufwriter_do_write(self); + // If we couldn't flush the whole buffer notify the user. + if (err == 0 && self->len != 0) { + err = MP_EAGAIN; } } - + // If there is an error raise it + if (err != 0) { + self->error = 0; + mp_raise_OSError(err); + } return mp_const_none; } static MP_DEFINE_CONST_FUN_OBJ_1(bufwriter_flush_obj, bufwriter_flush); From 2c4d1dbd80a89e995c7dd63792565c4a1177ccd0 Mon Sep 17 00:00:00 2001 From: "Francisco Blas (klondike) Izquierdo Riera" Date: Sun, 7 Jan 2024 23:48:40 +0100 Subject: [PATCH 2/2] py/modio: Implement BufferedReader. Some times there is a need for a BufferedReader in a way similar to how BufferedWritter works. A clear example is when using an underlying device requiring aligned reads, but a less clear example is when using deflate.DeflateIO which will do only 1-byte reads and can become crippling quickly when the underlying object is a python implemented stream instead of a native one. The BufferedReader will only attempt to do full-buffer reads and ensures word-alignment in a way similar to how the writer does. Similarly, it will also hide any errors when partial reads happen to ensure that any data copied so far can be returned first. Signed-off-by: Francisco Blas (klondike) Izquierdo Riera --- ports/esp32/mpconfigport.h | 1 + py/modio.c | 90 +++++++++++++++++++++++++++++++++++++- py/mpconfig.h | 5 +++ 3 files changed, 95 insertions(+), 1 deletion(-) diff --git a/ports/esp32/mpconfigport.h b/ports/esp32/mpconfigport.h index 7c26009578928..bfb11e2c6430e 100644 --- a/ports/esp32/mpconfigport.h +++ b/ports/esp32/mpconfigport.h @@ -76,6 +76,7 @@ #define MICROPY_PY_ALL_INPLACE_SPECIAL_METHODS (1) #define MICROPY_PY_BUILTINS_HELP_TEXT esp32_help_text #define MICROPY_PY_IO_BUFFEREDWRITER (1) +#define MICROPY_PY_IO_BUFFEREDREADER (1) #define MICROPY_PY_TIME_GMTIME_LOCALTIME_MKTIME (1) #define MICROPY_PY_TIME_TIME_TIME_NS (1) #define MICROPY_PY_TIME_INCLUDEFILE "ports/esp32/modtime.c" diff --git a/py/modio.c b/py/modio.c index c3960448e275a..9b8ebc7a138b1 100644 --- a/py/modio.c +++ b/py/modio.c @@ -109,7 +109,9 @@ static MP_DEFINE_CONST_OBJ_TYPE( #endif // MICROPY_PY_IO_IOBASE -#if MICROPY_PY_IO_BUFFEREDWRITER +#if MICROPY_PY_IO_BUFFEREDWRITER || MICROPY_PY_IO_BUFFEREDREADER + +// The structure and the new method are shared by reader and writer typedef struct _mp_obj_bufwriter_t { mp_obj_base_t base; mp_obj_t stream; @@ -130,6 +132,9 @@ static mp_obj_t bufwriter_make_new(const mp_obj_type_t *type, size_t n_args, siz return o; } +#endif // MICROPY_PY_IO_BUFFEREDWRITER || MICROPY_PY_IO_BUFFEREDREADER + +#if MICROPY_PY_IO_BUFFEREDWRITER // Writes out the data stored in the buffer so far static int bufwriter_do_write(mp_obj_bufwriter_t *self) { int rv = 0; @@ -239,8 +244,88 @@ static MP_DEFINE_CONST_OBJ_TYPE( protocol, &bufwriter_stream_p, locals_dict, &bufwriter_locals_dict ); + #endif // MICROPY_PY_IO_BUFFEREDWRITER +#if MICROPY_PY_IO_BUFFEREDREADER + +static mp_uint_t bufreader_read(mp_obj_t self_in, void *buf, mp_uint_t size, int *errcode) { + mp_obj_bufwriter_t *self = MP_OBJ_TO_PTR(self_in); + + mp_uint_t org_size = size; + // Cache this value since it should not change thus avoiding a dereference + size_t alloc = self->alloc; + + *errcode = self->error; + self->error = 0; + + while (*errcode == 0 && size > 0) { + // Buffer filling policy here is to fill the entire buffer all the time. + // This allows e.g. to have a block device as backing storage and read + // entire blocks from it. memcpy below is not ideal and could be + // optimized in some cases. But the way it is now it at least ensures + // that buffer is word-aligned, to guard against obscure cases when it + // matters, e.g. + // https://github.com/micropython/micropython/issues/1863 + + // Buffer needs to have at least one byte + if (self->len == 0) { + self->len = mp_stream_read_exactly(self->stream, self->buf, alloc, errcode); + if (self->len < alloc) { + // If no data was read stop the loop as it is either an error or an EOF + // This check is moved here because it speeds up the more common case + // where the buffer was completely filled. + if (self->len == 0) { + break; + } + // Buffers may overlap, therefore we use memmove instead of memcpy + memmove(self->buf + (alloc - self->len), self->buf, self->len); + } + } + mp_uint_t rem = MIN(self->len, size); + memcpy(buf, self->buf + (alloc - self->len), rem); + self->len -= rem; + buf = (byte *)buf + rem; + size -= rem; + } + + // If there is a prior error + if (*errcode != 0) { + // If no data was read return it + if (size == org_size) { + return MP_STREAM_ERROR; + } + // Otherwise save it so we can fail on the next call + self->error = *errcode; + *errcode = 0; + } + + // Return the data that has been read so far + return org_size - size; +} + +static const mp_rom_map_elem_t bufreader_locals_dict_table[] = { + { MP_ROM_QSTR(MP_QSTR_read), MP_ROM_PTR(&mp_stream_read_obj) }, + { MP_ROM_QSTR(MP_QSTR_readinto), MP_ROM_PTR(&mp_stream_readinto_obj) }, + { MP_ROM_QSTR(MP_QSTR_readline), MP_ROM_PTR(&mp_stream_unbuffered_readline_obj) }, +}; +static MP_DEFINE_CONST_DICT(bufreader_locals_dict, bufreader_locals_dict_table); + +static const mp_stream_p_t bufreader_stream_p = { + .read = bufreader_read, +}; + +static MP_DEFINE_CONST_OBJ_TYPE( + mp_type_bufreader, + MP_QSTR_BufferedReader, + MP_TYPE_FLAG_NONE, + make_new, bufwriter_make_new, + protocol, &bufreader_stream_p, + locals_dict, &bufreader_locals_dict + ); + +#endif // MICROPY_PY_IO_BUFFEREDREADER + static const mp_rom_map_elem_t mp_module_io_globals_table[] = { { MP_ROM_QSTR(MP_QSTR___name__), MP_ROM_QSTR(MP_QSTR_io) }, // Note: mp_builtin_open_obj should be defined by port, it's not @@ -256,6 +341,9 @@ static const mp_rom_map_elem_t mp_module_io_globals_table[] = { #if MICROPY_PY_IO_BUFFEREDWRITER { MP_ROM_QSTR(MP_QSTR_BufferedWriter), MP_ROM_PTR(&mp_type_bufwriter) }, #endif + #if MICROPY_PY_IO_BUFFEREDREADER + { MP_ROM_QSTR(MP_QSTR_BufferedReader), MP_ROM_PTR(&mp_type_bufreader) }, + #endif }; static MP_DEFINE_CONST_DICT(mp_module_io_globals, mp_module_io_globals_table); diff --git a/py/mpconfig.h b/py/mpconfig.h index 49a9fa35ccf02..8c34481b052bd 100644 --- a/py/mpconfig.h +++ b/py/mpconfig.h @@ -1482,6 +1482,11 @@ typedef double mp_float_t; #define MICROPY_PY_IO_BUFFEREDWRITER (MICROPY_CONFIG_ROM_LEVEL_AT_LEAST_EVERYTHING) #endif +// Whether to provide "io.BufferedReader" class +#ifndef MICROPY_PY_IO_BUFFEREDREADER +#define MICROPY_PY_IO_BUFFEREDREADER (MICROPY_CONFIG_ROM_LEVEL_AT_LEAST_EVERYTHING) +#endif + // Whether to provide "struct" module #ifndef MICROPY_PY_STRUCT #define MICROPY_PY_STRUCT (MICROPY_CONFIG_ROM_LEVEL_AT_LEAST_CORE_FEATURES)