8000 gh-76785: Clean Up the Channels Module by ericsnowcurrently · Pull Request #110568 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

gh-76785: Clean Up the Channels Module #110568

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix some doc comments.
  • Loading branch information
ericsnowcurrently committed Oct 17, 2023
commit 6e775431154af9db52d7f9e28bf98fd8d842db3f
23 changes: 14 additions & 9 deletions Modules/_xxinterpchannelsmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,7 @@ channel_destroy(_channels *channels, int64_t id)
}

// Push an object onto the channel.
// The current interpreter gets associated with the send end of the channel.
// Optionally request to be notified when it is received.
static int
channel_send(_channels *channels, int64_t id, PyObject *obj,
Expand All @@ -1831,6 +1832,7 @@ channel_send(_channels *channels, int64_t id, PyObject *obj,
if (interp == NULL) {
return -1;
}
int64_t interpid = PyInterpreterState_GetID(interp);

// Look up the channel.
PyThread_type_lock mutex = NULL;
Expand Down Expand Up @@ -1860,8 +1862,7 @@ channel_send(_channels *channels, int64_t id, PyObject *obj,
}

// Add the data to the channel.
int res = _channel_add(chan, PyInterpreterState_GetID(interp),
data, waiting);
int res = _channel_add(chan, interpid, data, waiting);
PyThread_release_lock(mutex);
if (res != 0) {
// We may chain an exception here:
Expand Down Expand Up @@ -1948,6 +1949,7 @@ channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
}

// Pop the next object off the channel. Fail if empty.
// The current interpreter gets associated with the recv end of the channel.
// XXX Support a "wait" mutex?
static int
channel_recv(_channels *channels, int64_t id, PyObject **res)
Expand All @@ -1963,6 +1965,7 @@ channel_recv(_channels *channels, int64_t id, PyObject **res)
}
return 0;
}
int64_t interpid = PyInterpreterState_GetID(interp);

// Look up the channel.
PyThread_type_lock mutex = NULL;
Expand All @@ -1977,8 +1980,7 @@ channel_recv(_channels *channels, int64_t id, PyObject **res)
// Pop off the next item from the channel.
_PyCrossInterpreterData *data = NULL;
_waiting_t *waiting = NULL;
err = _channel_next(chan, PyInterpreterState_GetID(interp), &data,
&waiting);
err = _channel_next(chan, interpid, &data, &waiting);
PyThread_release_lock(mutex);
if (err != 0) {
return err;
Expand Down Expand Up @@ -2024,29 +2026,30 @@ channel_recv(_channels *channels, int64_t id, PyObject **res)
// The channel is marked as closed if no other interpreters
// are currently associated.
static int
channel_drop(_channels *channels, int64_t id, int send, int recv)
channel_release(_channels *channels, int64_t cid, int send, int recv)
{
PyInterpreterState *interp = _get_current_interp();
if (interp == NULL) {
return -1;
}
int64_t interpid = PyInterpreterState_GetID(interp);

// Look up the channel.
PyThread_type_lock mutex = NULL;
_PyChannelState *chan = NULL;
int err = _channels_lookup(channels, id, &mutex, &chan);
int err = _channels_lookup(channels, cid, &mutex, &chan);
if (err != 0) {
return err;
}
// Past this point we are responsible for releasing the mutex.

// Close one or both of the two ends.
int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv);
int res = _channel_close_interpreter(chan, interpid, send-recv);
PyThread_release_lock(mutex);
return res;
}

// Close the channel. Fail if it's already closed.
// Close the channel (for all interpreters). Fail if it's already closed.
// Close immediately if it's empty. Otherwise, disallow sending and
// finally close once empty. Optionally, immediately clear and close it.
static int
Expand All @@ -2055,6 +2058,8 @@ channel_close(_channels *channels, int64_t id, int end, int force)
return _channels_close(channels, id, NULL, end, force);
}

// Return true if the identified interpreter is associated
// with the given end of the channel.
static int
channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
int send)
Expand Down Expand Up @@ -3052,7 +3057,7 @@ channelsmod_release(PyObject *self, PyObject *args, PyObject *kwds)
// XXX Handle force is True.
// XXX Fix implicit release.

int err = channel_drop(&_globals.channels, cid, send, recv);
int err = channel_release(&_globals.channels, cid, send, recv);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
Expand Down
0