Proposal: Transport-agnostic resumable streams #543
Replies: 6 comments 7 replies
-
Universal subscription mechanismAnother thought I had is that transport-agnostic resumable streams provide an alternative way of modeling subscriptions. For example, currently, in order to subscribe to resource updates, the client sends a The transport is left to handle the case where a client might disconnect while the server continues to emit An alternative way of modeling resource subscriptions would be for the client to send a The stream would be resumable regardless of transport, guaranteeing that the client sees all update notifications. There would also be a clear retention policy for update notifications, allowing servers to reclaim resources as appropriate. |
Beta Was this translation helpful? Give feedback.
-
StatefulnessDuring the last Hosting WG meeting, there were some questions about statefulness and how this proposal would work with the Streamable HTTP transport, so I wanted to share a high-level sketch. The sketch is not prescriptive; it's just one possible implementation. The steps are as follows:
Streaming HTTP transport compatibilityBecause the Streamable HTTP transport is unidrectional, it does not use explicit ACKs. Instead,
Footnotes
|
Beta Was this translation helpful? Give feedback.
-
I might have missed something but I didn't gather whether this is going to be an approach that applies to all tools (all tools stream regardless of whether the response is created at once or over time) or only to select tools where streaming responses comes more naturally. If this idea allows for the latter, it could also benefit from tool annotations that allow the client to identify streaming vs non-streaming tools: #489 |
Beta Was this translation helpful? Give feedback.
-
Webhooks exampleWebhooks aren't part of this proposal, but during the last Hosting WG meeting, I mentioned an example of how this proposal could integrate with webhooks:
Steps 3 and 5-8 can be handled transparently by the SDKs, so the tool author does not need to do anything extra to enable this flow. |
Beta Was this translation helpful? Give feedback.
-
Backward compatibility and opting inThis proposal does not require that tools be written differently than they are now. The SDKs should be able to handle beginning, resuming, and ending streams in a transparent way (and only when permitted by the negotiated protocol version). The only required change to server code should be configuring If there are no configured values for |
Beta Was this translation helpful? Give feedback.
-
SDK API for multi-turn tool interactionsThe following SDK API suggestion is not part of this proposal; this proposal will work with the current SDK APIs. However, transport-agnostic resumable streams offer a way to scale multi-turn tool interactions: when the server sends a request to the client, such as a sampling request, the server can disconnect and stop execution while waiting for a response. The current TypeScript SDK (and perhaps other SDKs) makes this difficult because server-to-client requests are handled as promises. The server makes a request and awaits the promise, holding server resources until the client responds. Below is an example of a suggested API for handling server-to-client requests using discrete functions. To make a request, the server returns a request object from a function, allowing the server to stop execution while waiting for a response. The request object server.tool(
"my_long_running_tool",
inputSchema,
streaming((inputParams) => {
// When this function returns, the SDK sends a `stream/begin` notification,
// then the SDK runs the `perform` callback further below.
return { resumeInterval: { min: 10, max: 24 * 60 * 60 } };
}).perform((inputParams, stream) => {
// This function can close the connection at any time and continue to emit
// JSON-RPC notifications.
stream.closeConnection();
stream.emitProgress(progress);
// When a stream will be interrupted due to a server-to-client request, the
// server should explicitly manage its state.
myStore.set(stream.id, myState);
// The server can make a request to the client by returning a request
// object. If the connection is still open, the request will be sent over
// the connection, and the SDK could then close the connection so that
// resources aren't consumed while waiting for a response. If the
// connection was already closed, the request will be stored along with
// emitted notifications.
return stream.samplingRequest(prompt);
}).resume((samplingResult, stream) => {
// Restore state from the previous callback:
let previousState = myStore.get(stream.id);
// Emit result:
stream.emitResult(result);
// This function does not return a value, so the SDK sends a `stream/end`
// notification.
}).finalize((stream) => {
// This function is called after the stream ends or is abandoned.
// Clear state from external store to free resources:
myStore.delete(stream.id);
})
); |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Pre-submission Checklist
Your Idea
Motivation
Stream resumability as currently defined faces multiple issues:
Last-Event-ID
.Proposal
To address these issues, I'd like to propose a transport-agnostic resumable streams. The basic elements are:
stream/begin
JSON-RPC notificationstream/begin
JSON-RPC notification with arequestId
andstreamId
.requestId
is the request'sid
property.streamId
is a generated unique ID for the stream.stream/begin
also includes aresumeInterval
param withmin
andmax
properties.resumeInterval.min
indicates the minimum amount of time the client should wait before resuming the stream after a disconnect.resumeInterval.max
indicates the maximum amount of time the client may wait before resuming the stream after a disconnect. If this limit is exceeded, the stream is considered "abandoned", and the server may delete all data and cancel all work related to the stream. A value of 0 indicates that the stream is not resumable, and that the work will be cancelled upon disconnect.stream/begin
is sent, both the server and client may disconnect at will. (We will need to amend this part of the spec.)stream/end
JSON-RPC notificationstream/end
JSON-RPC notification with thestreamId
to mark the end of the stream.stream/resume
JSON-RPC notification (or request?)stream/resume
JSON-RPC notification with astreamId
to resume that stream on the current connection. The server should then send all JSON-RPC messages for that stream that were not previously sent to the client.stream/resume
a JSON-RPC request instead of a JSON-RPC notification. As such, the server would immediately send a JSON-RPC response either confirming the resume or reporting an error for an invalidstreamId
. However, this would inject an extra message into the stream, which might slightly complicate replay mechanisms.streamId
is invalid, the server should immediately send astream/end
notification.stream/poll
JSON-RPC requeststream/poll
JSON-RPC request with astreamId
, and the server should respond with the status of the stream, including whether there are pending JSON-RPC messages and whether the stream has ended.stream/poll
request, the server should reset the "abandoned" timer for the stream. However, the server may choose not to do so based on internal limits for stream age.Server-to-client requests
Handling server-to-client requests (e.g. sampling requests) can be tricky, depending on the transport. There are two points that I think are important:
Following from those points, I think unidirectional transports such as HTTP should disconnect when the server sends a request to the client during a stream, including during a stream initiated by
stream/resume
.Also, following from the 2nd point, I think server-to-client requests should embed the stream ID in the request ID. That way, if a response is received on a new connection, the stream ID can be extracted from the response (which will have the same ID as the request), and the stream can resume on that connection.
Though not part of this proposal, I have a suggestion for an SDK API that leverages resumable streams to improve scalability in the face of server-to-client requests.
Batching and multiplexing
Support for JSON-RPC batching was recently removed from the spec; however, when using a unidirectional transport such as HTTP, there are a couple of compelling use cases related to streams:
stream/poll
requests on a single connection.stream/resume
notifications on a single connection to multiplex streams.Webhooks
I see webhooks as more of a transport-level concern, so webhooks are not part of this proposal. However, webhook support could be based on resumable streams. For a detailed example, see #543 (comment).
Asynchronous long-running tasks
This proposal is a generalization of my proposed approach to asynchronous long-running tasks, and supersedes that proposal. I think the basic elements in this proposal provide a solid base for handling asynchronous long-running tasks and can be extended as necessary with features like batching and webhooks.
Scope
Beta Was this translation helpful? Give feedback.
All reactions