8000 feat: add status watcher to MCP server by code-asher · Pull Request #18320 · coder/coder · GitHub
[go: up one dir, main page]

Skip to content

feat: add status watcher to MCP server #18320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add status watcher to MCP server
Since we can now get status updates from two places, they are placed in
a queue so we can handle them one at a time.
  • Loading branch information
code-asher committed Jun 12, 2025
commit 56c41c832b6c3ac3454f36c30df28fe72fd63100
194 changes: 187 additions & 7 deletions cli/exp_mcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"errors"
"net/url"
"os"
"path/filepath"
"slices"
Expand All @@ -15,8 +16,10 @@ import (
"github.com/spf13/afero"
"golang.org/x/xerrors"

agentapi "github.com/coder/agentapi-sdk-go"
"github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/cli/cliui"
"github.com/coder/coder/v2/cli/cliutil"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/toolsdk"
Expand All @@ -25,6 +28,7 @@ import (

const (
envAppStatusSlug = "CODER_MCP_APP_STATUS_SLUG"
envLLMAgentURL = "CODER_MCP_LLM_AGENT_URL"
)

func (r *RootCmd) mcpCommand() *serpent.Command {
Expand Down Expand Up @@ -347,10 +351,20 @@ func (*RootCmd) mcpConfigureCursor() *serpent.Command {
return cmd
}

type reportTask struct {
link string
messageID int64
selfReported bool
state codersdk.WorkspaceAppStatusState
summary string
}

type mcpServer struct {
agentClient *agentsdk.Client
appStatusSlug string
client *codersdk.Client
llmClient *agentapi.Client
queue *cliutil.Queue[reportTask]
}

func (r *RootCmd) mcpServer() *serpent.Command {
Expand All @@ -359,12 +373,14 @@ func (r *RootCmd) mcpServer() *serpent.Command {
instructions string
allowedTools []string
appStatusSlug string
llmAgentURL url.URL
)
return &serpent.Command{
Use: "server",
Handler: func(inv *serpent.Invocation) error {
srv := &mcpServer{
appStatusSlug: appStatusSlug,
queue: cliutil.NewQueue[reportTask](10),
}

// Display client URL separately from authentication status.
Expand Down Expand Up @@ -408,8 +424,36 @@ func (r *RootCmd) mcpServer() *serpent.Command {
cliui.Infof(inv.Stderr, "Task reporter : Enabled")
}

// Start the server.
return srv.start(inv, instructions, allowedTools)
// Try to create a client for the LLM agent API, which is used to get the
// screen status to make the status reporting more robust. No auth
// needed, so no validation.
if llmAgentURL.String() == "" {
cliui.Infof(inv.Stderr, "LLM agent URL : Not configured")
} else {
cliui.Infof(inv.Stderr, "LLM agent URL : %s", llmAgentURL.String())
llmClient, err := agentapi.NewClient(llmAgentURL.String())
if err != nil {
cliui.Infof(inv.Stderr, "Screen events : Disabled")
cliui.Warnf(inv.Stderr, "%s must be set", envLLMAgentURL)
} else {
cliui.Infof(inv.Stderr, "Screen events : Enabled")
srv.llmClient = llmClient
}
}

ctx, cancel := context.WithCancel(inv.Context())
defer cancel()
defer srv.queue.Close()

cliui.Infof(inv.Stderr, "Failed to watch screen events")
// Start the reporter, watcher, and server.
if srv.agentClient != nil && appStatusSlug != "" {
srv.startReporter(ctx, inv)
if srv.llmClient != nil {
srv.startWatcher(ctx, inv)
}
}
return srv.startServer(ctx, inv, instructions, allowedTools)
},
Short: "Start the Coder MCP server.",
Middleware: serpent.Chain(
Expand Down Expand Up @@ -438,14 +482,142 @@ func (r *RootCmd) mcpServer() *serpent.Command {
Value: serpent.StringOf(&appStatusSlug),
Default: "",
},
{
Flag: "llm-agent-url",
Description: "The URL of the LLM agent API, used to listen for status updates.",
Env: envLLMAgentURL,
Value: serpent.URLOf(&llmAgentURL),
},
},
}
}

func (s *mcpServer) start(inv *serpent.Invocation, instructions string, allowedTools []string) error {
ctx, cancel := context.WithCancel(inv.Context())
defer cancel()
func (s *mcpServer) startReporter(ctx context.Context, inv *serpent.Invocation) {
var lastMessageID int64
shouldUpdate := func(item reportTask) codersdk.WorkspaceAppStatusState {
// Always send self-reported updates.
if item.selfReported {
return item.state
}
// Always send completed states.
switch item.state {
case codersdk.WorkspaceAppStatusStateComplete,
codersdk.WorkspaceAppStatusStateFailure:
return item.state
}
// Always send "working" when there is a new message, since this means the
// user submitted a message through the API and we know the LLM will begin
// work soon if it has not already.
if item.messageID > lastMessageID {
return codersdk.WorkspaceAppStatusStateWorking
}
// Otherwise, if the state is "working" and there have been no new messages,
// it means either that the LLM is still working or it means the user has
// interacted with the terminal directly. For now, we are ignoring these
// updates. This risks missing cases where the user manually submits a new
// prompt and the LLM becomes active and does not update itself, but it
// avoids spamming useless status updates.
return ""
}
var lastPayload agentsdk.PatchAppStatus
go func() {
for {
// TODO: Even with the queue, there is still the potential that a message
// from the screen watcher and a message from the LLM could arrive out of
// order if the timing is just right. We might want to wait a bit, then
// check if the status has changed before committing.
item, ok := s.queue.Pop()
if !ok {
return
}

state := shouldUpdate(item)
if state == "" {
continue
}

if item.messageID != 0 {
lastMessageID = item.messageID
}

payload := agentsdk.PatchAppStatus{
AppSlug: s.appStatusSlug,
Message: item.summary,
URI: item.link,
State: state,
}

// Preserve previous message and URI.
if payload.Message == "" {
payload.Message = lastPayload.Message
}
if payload.URI == "" {
payload.URI = lastPayload.URI
}

// Avoid sending duplicate updates.
if lastPayload.State == payload.State &&
lastPayload.URI == payload.URI &&
lastPayload.Message == payload.Message {
continue
}

err := s.agentClient.PatchAppStatus(ctx, payload)
if err != nil && !errors.Is(err, context.Canceled) {
cliui.Warnf(inv.Stderr, "Failed to report task status: %s", err)
}

lastPayload = payload
}
}()
}

func (s *mcpServer) startWatcher(ctx context.Context, inv *serpent.Invocation) {
eventsCh, errCh, err := s.llmClient.SubscribeEvents(ctx)
if err != nil {
cliui.Warnf(inv.Stderr, "Failed to watch screen events: %s", err)
return
}
go func() {
for {
select {
case <-ctx.Done():
return
case event := <-eventsCh:
switch ev := event.(type) {
case agentapi.EventStatusChange:
// If the screen is stable, assume complete.
state := codersdk.WorkspaceAppStatusStateWorking
if ev.Status == agentapi.StatusStable {
state = codersdk.WorkspaceAppStatusStateComplete
}
err := s.queue.Push(reportTask{
state: state,
})
if err != nil {
cliui.Warnf(inv.Stderr, "Failed to queue update: %s", err)
return
}
case agentapi.EventMessageUpdate:
err := s.queue.Push(reportTask{
messageID: ev.Id,
})
if err != nil {
cliui.Warnf(inv.Stderr, "Failed to queue update: %s", err)
return
}
}
case err := <-errCh:
if !errors.Is(err, context.Canceled) {
cliui.Warnf(inv.Stderr, "Received error from screen event watcher: %s", err)
}
return
}
}
}()
}

func (s *mcpServer) startServer(ctx context.Context, inv *serpent.Invocation, instructions string, allowedTools []string) error {
cliui.Infof(inv.Stderr, "Starting MCP server")

cliui.Infof(inv.Stderr, "Instructions : %q", instructions)
Expand Down Expand Up @@ -476,8 +648,16 @@ func (s *mcpServer) start(inv *serpent.Invocation, instructions string, allowedT

// Add tool dependencies.
toolOpts := []func(*toolsdk.Deps){
toolsdk.WithAgentClient(s.agentClient),
toolsdk.WithAppStatusSlug(s.appStatusSlug),
toolsdk.WithTaskReporter(func(args toolsdk.ReportTaskArgs) error {
// TODO: Is it OK to just push and return or should we wait for it to
// actually get disatched to return any request errors?
return s.queue.Push(reportTask{
link: args.Link,
selfReported: true,
state: codersdk.WorkspaceAppStatusState(args.State),
summary: args.Summary,
})
}),
}

toolDeps, err := toolsdk.NewDeps(s.client, toolOpts...)
Expand Down
Loading
0