-
-
Notifications
You must be signed in to change notification settings - Fork 8.2k
Add a BufferedReader and allow BufferedWriter to handle partial writes and errors after some data was written. #13390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
klondi
wants to merge
2
commits into
micropython:master
Choose a base branch
from
klondi:bufferedwriter
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,8 @@ | |
#include <assert.h> | ||
#include <string.h> | ||
|
||
#include "py/mpconfig.h" | ||
#include "py/misc.h" | ||
#include "py/runtime.h" | ||
#include "py/builtin.h" | ||
#include "py/stream.h" | ||
|
@@ -107,12 +109,15 @@ | |
|
||
#endif // MICROPY_PY_IO_IOBASE | ||
|
||
#if MICROPY_PY_IO_BUFFEREDWRITER | ||
#if MICROPY_PY_IO_BUFFEREDWRITER || MICROPY_PY_IO_BUFFEREDREADER | ||
|
||
// The structure and the new method are shared by reader and writer | ||
typedef struct _mp_obj_bufwriter_t { | ||
mp_obj_base_t base; | ||
mp_obj_t stream; | ||
size_t alloc; | ||
size_t len; | ||
int error; | ||
byte buf[0]; | ||
} mp_obj_bufwriter_t; | ||
|
||
|
@@ -123,61 +128,100 @@ | |
o->stream = args[0]; | ||
o->alloc = alloc; | ||
o->len = 0; | ||
o->error = 0; | ||
return o; | ||
} | ||
|
||
#endif // MICROPY_PY_IO_BUFFEREDWRITER || MICROPY_PY_IO_BUFFEREDREADER | ||
|
||
#if MICROPY_PY_IO_BUFFEREDWRITER | ||
// Writes out the data stored in the buffer so far | ||
static int bufwriter_do_write(mp_obj_bufwriter_t *self) { | ||
int rv = 0; | ||
// This cannot return 0 without an error | ||
mp_uint_t out_sz = mp_stream_write_exactly(self->stream, self->buf, self->len, &rv); | ||
self->len -= out_sz; | ||
// Copy the non written characters back to the beginning | ||
if (self->len != 0) { | ||
// Use memmove since there might be overlaps | ||
memmove(self->buf, self->buf + out_sz, self->len); | ||
} | ||
return rv; | ||
} | ||
|
||
static mp_uint_t bufwriter_write(mp_obj_t self_in, const void *buf, mp_uint_t size, int *errcode) { | ||
mp_obj_bufwriter_t *self = MP_OBJ_TO_PTR(self_in); | ||
|
||
mp_uint_t org_size = size; | ||
// Alloc should always remain the same so cache it. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor, but here the sentence ends with a dot (imo best), but not the other comments. Also the comment below is after its statement instead of on the line above it. |
||
size_t alloc = self->alloc; | ||
mp_uint_t rem = 0; // No data has been copied from the buffer so far | ||
|
||
while (size > 0) { | ||
mp_uint_t rem = self->alloc - self->len; | ||
if (size < rem) { | ||
memcpy(self->buf + self->len, buf, size); | ||
self->len += size; | ||
return org_size; | ||
} | ||
if (self->error != 0) { | ||
*errcode = self->error; | ||
self->error = 0; | ||
return MP_STREAM_ERROR; | ||
} | ||
|
||
// Using this form allows ensuring that we try to empty the buffer if it is | ||
// full without trying to extract any data first. | ||
while (true) { | ||
// Buffer flushing policy here is to flush entire buffer all the time. | ||
// This allows e.g. to have a block device as backing storage and write | ||
// entire block to it. memcpy below is not ideal and could be optimized | ||
// in some cases. But the way it is now it at least ensures that buffer | ||
// is word-aligned, to guard against obscure cases when it matters, e.g. | ||
// https://github.com/micropython/micropython/issues/1863 | ||
// Try to empty the buffer first | ||
if (self->len == alloc) { | ||
if ((*errcode = bufwriter_do_write(self)) != 0) { | ||
// If this is the first write with data from the new buffer | ||
// But no data from the new buffer was written out, | ||
// then remove the data added from the new buffer and raise | ||
// the error | ||
if (org_size == size + rem && self->len >= rem) { | ||
// Remove the extra non-written data from the buffer and error | ||
self->len -= rem; | ||
return MP_STREAM_ERROR; | ||
} | ||
// Some data from the new buffer has been written rollback as much | ||
// as possible and return what was written then raise an error | ||
// on the next call. | ||
size += self->len; | ||
self->len = 0; | ||
self->error = *errcode; | ||
*errcode = 0; | ||
return org_size - size; | ||
} | ||
} | ||
// No data left to write | ||
if (size == 0) { | ||
return org_size; | ||
} | ||
|
||
rem = MIN(alloc - self->len, size); | ||
memcpy(self->buf + self->len, buf, rem); | ||
self->len += rem; | ||
buf = (byte *)buf + rem; | ||
size -= rem; | ||
mp_uint_t out_sz = mp_stream_write_exactly(self->stream, self->buf, self->alloc, errcode); | ||
(void)out_sz; | ||
if (*errcode != 0) { | ||
return MP_STREAM_ERROR; | ||
} | ||
// TODO: try to recover from a case of non-blocking stream, e.g. move | ||
// remaining chunk to the beginning of buffer. | ||
assert(out_sz == self->alloc); | ||
self->len = 0; | ||
} | ||
|
||
return org_size; | ||
} | 8000 tr>||
|
||
static mp_obj_t bufwriter_flush(mp_obj_t self_in) { | ||
mp_obj_bufwriter_t *self = MP_OBJ_TO_PTR(self_in); | ||
|
||
if (self->len != 0) { | ||
int err; | ||
mp_uint_t out_sz = mp_stream_write_exactly(self->stream, self->buf, self->len, &err); | ||
(void)out_sz; | ||
// TODO: try to recover from a case of non-blocking stream, e.g. move | ||
// remaining chunk to the beginning of buffer. | ||
assert(out_sz == self->len); | ||
self->len = 0; | ||
if (err != 0) { | ||
mp_raise_OSError(err); | ||
int err = self->error; | ||
if (err == 0 && self->len != 0) { | ||
err = bufwriter_do_write(self); | ||
// If we couldn't flush the whole buffer notify the user. | ||
if (err == 0 && self->len != 0) { | ||
err = MP_EAGAIN; | ||
} | ||
} | ||
|
||
// If there is an error raise it | ||
if (err != 0) { | ||
self->error = 0; | ||
mp_raise_OSError(err); | ||
} | ||
return mp_const_none; | ||
} | ||
static MP_DEFINE_CONST_FUN_OBJ_1(bufwriter_flush_obj, bufwriter_flush); | ||
|
@@ -200,8 +244,88 @@ | |
protocol, &bufwriter_stream_p, | ||
locals_dict, &bufwriter_locals_dict | ||
); | ||
|
||
#endif // MICROPY_PY_IO_BUFFEREDWRITER | ||
|
||
#if MICROPY_PY_IO_BUFFEREDREADER | ||
|
||
static mp_uint_t bufreader_read(mp_obj_t self_in, void *buf, mp_uint_t size, int *errcode) { | ||
mp_obj_bufwriter_t *self = MP_OBJ_TO_PTR(self_in); | ||
|
||
mp_uint_t org_size = size; | ||
// Cache this value since it should not change thus avoiding a dereference | ||
size_t alloc = self->alloc; | ||
|
||
*errcode = self->error; | ||
self->error = 0; | ||
|
||
while (*errcode == 0 && size > 0) { | ||
// Buffer filling policy here is to fill the entire buffer all the time. | ||
// This allows e.g. to have a block device as backing storage and read | ||
// entire blocks from it. memcpy below is not ideal and could be | ||
// optimized in some cases. But the way it is now it at least ensures | ||
// that buffer is word-aligned, to guard against obscure cases when it | ||
// matters, e.g. | ||
// https://github.com/micropython/micropython/issues/1863 | ||
|
||
// Buffer needs to have at least one byte | ||
if (self->len == 0) { | ||
self->len = mp_stream_read_exactly(self->stream, self->buf, alloc, errcode); | ||
if (self->len < alloc) { | ||
// If no data was read stop the loop as it is either an error or an EOF | ||
// This check is moved here because it speeds up the more common case | ||
// where the buffer was completely filled. | ||
if (self->len == 0) { | ||
break; | ||
} | ||
// Buffers may overlap, therefore we use memmove instead of memcpy | ||
memmove(self->buf + (alloc - self->len), self->buf, self->len); | ||
} | ||
} | ||
mp_uint_t rem = MIN(self->len, size); | ||
memcpy(buf, self->buf + (alloc - self->len), rem); | ||
self->len -= rem; | ||
buf = (byte *)buf + rem; | ||
size -= rem; | ||
} | ||
|
||
// If there is a prior error | ||
if (*errcode != 0) { | ||
// If no data was read return it | ||
if (size == org_size) { | ||
return MP_STREAM_ERROR; | ||
} | ||
// Otherwise save it so we can fail on the next call | ||
self->error = *errcode; | ||
*errcode = 0; | ||
} | ||
|
||
// Return the data that has been read so far | ||
return org_size - size; | ||
} | ||
|
||
static const mp_rom_map_elem_t bufreader_locals_dict_table[] = { | ||
{ MP_ROM_QSTR(MP_QSTR_read), MP_ROM_PTR(&mp_stream_read_obj) }, | ||
{ MP_ROM_QSTR(MP_QSTR_readinto), MP_ROM_PTR(&mp_stream_readinto_obj) }, | ||
{ MP_ROM_QSTR(MP_QSTR_readline), MP_ROM_PTR(&mp_stream_unbuffered_readline_obj) }, | ||
}; | ||
static MP_DEFINE_CONST_DICT(bufreader_locals_dict, bufreader_locals_dict_table); | ||
|
||
static const mp_stream_p_t bufreader_stream_p = { | ||
.read = bufreader_read, | ||
}; | ||
|
||
static MP_DEFINE_CONST_OBJ_TYPE( | ||
mp_type_bufreader, | ||
MP_QSTR_BufferedReader, | ||
MP_TYPE_FLAG_NONE, | ||
make_new, bufwriter_make_new, | ||
protocol, &bufreader_stream_p, | ||
locals_dict, &bufreader_locals_dict | ||
); | ||
|
||
#endif // MICROPY_PY_IO_BUFFEREDREADER | ||
|
||
static const mp_rom_map_elem_t mp_module_io_globals_table[] = { | ||
{ MP_ROM_QSTR(MP_QSTR___name__), MP_ROM_QSTR(MP_QSTR_io) }, | ||
// Note: mp_builtin_open_obj should be defined by port, it's not | ||
|
@@ -217,6 +341,9 @@ | |
#if MICROPY_PY_IO_BUFFEREDWRITER | ||
{ MP_ROM_QSTR(MP_QSTR_BufferedWriter), MP_ROM_PTR(&mp_type_bufwriter) }, | ||
#endif | ||
#if MICROPY_PY_IO_BUFFEREDREADER | ||
{ MP_ROM_QSTR(MP_QSTR_BufferedReader), MP_ROM_PTR(&mp_type_bufreader) }, | ||
#endif | ||
}; | ||
|
||
static MP_DEFINE_CONST_DICT(mp_module_io_globals, mp_module_io_globals_table); | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate on what you mean with this? Like: why would it be an issue for this bit of code if it were 0?