10000 fix(agent/agentcontainers): split Init into Init and Start for early API responses by mafredri · Pull Request #18640 · coder/coder · GitHub
[go: up one dir, main page]

Skip to content

fix(agent/agentcontainers): split Init into Init and Start for early API responses #18640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,11 +1158,26 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
}
}

scripts := manifest.Scripts
var (
scripts = manifest.Scripts
devcontainerScripts map[uuid.UUID]codersdk.WorkspaceAgentScript
)
if a.containerAPI != nil {
// Init the container API with the manifest and client so that
// we can start accepting requests. The final start of the API
// happens after the startup scripts have been executed to
// ensure the presence of required tools. This means we can
// return existing devcontainers but actual container detection
// and creation will be deferred.
a.containerAPI.Init(
agentcontainers.WithManifestInfo(manifest.OwnerName, manifest.WorkspaceName, manifest.AgentName),
agentcontainers.WithDevcontainers(manifest.Devcontainers, manifest.Scripts),
agentcontainers.WithSubAgentClient(agentcontainers.NewSubAgentClientFromAPI(a.logger, aAPI)),
)

// Since devcontainer are enabled, remove devcontainer scripts
// from the main scripts list to avoid showing an error.
scripts, _ = agentcontainers.ExtractDevcontainerScripts(manifest.Devcontainers, manifest.Scripts)
scripts, devcontainerScripts = agentcontainers.ExtractDevcontainerScripts(manifest.Devcontainers, scripts)
}
err = a.scriptRunner.Init(scripts, aAPI.ScriptCompleted)
if err != nil {
Expand All @@ -1183,13 +1198,10 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
err := a.scriptRunner.Execute(a.gracefulCtx, agentscripts.ExecuteStartScripts)

if a.containerAPI != nil {
a.containerAPI.Init(
agentcontainers.WithManifestInfo(manifest.OwnerName, manifest.WorkspaceName, manifest.AgentName),
agentcontainers.WithDevcontainers(manifest.Devcontainers, manifest.Scripts),
agentcontainers.WithSubAgentClient(agentcontainers.NewSubAgentClientFromAPI(a.logger, aAPI)),
)

_, devcontainerScripts := agentcontainers.ExtractDevcontainerScripts(manifest.Devcontainers, manifest.Scripts)
// Start the container API after the startup scripts have
// been executed to ensure that the required tools can be
// installed.
a.containerAPI.Start()
for _, dc := range manifest.Devcontainers {
cErr := a.createDevcontainer(ctx, aAPI, dc, devcontainerScripts[dc.ID])
err = errors.Join(err, cErr)
Expand Down
41 changes: 28 additions & 13 deletions agent/agentcontainers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type API struct {
cancel context.CancelFunc
watcherDone chan struct{}
updaterDone chan struct{}
initialUpdateDone chan struct{} // Closed after first update in updaterLoop.
updateTrigger chan chan error // Channel to trigger manual refresh.
updateInterval time.Duration // Interval for periodic container updates.
logger slog.Logger
Expand All @@ -73,7 +72,8 @@ type API struct {
workspaceName string
parentAgent string

mu sync.RWMutex
mu sync.RWMutex // Protects the following fields.
initDone chan struct{} // Closed by Init.
closed bool
containers codersdk.WorkspaceAgentListContainersResponse // Output from the last list operation.
containersErr error // Error from the last list operation.
Expand Down Expand Up @@ -270,7 +270,7 @@ func NewAPI(logger slog.Logger, options ...Option) *API {
api := &API{
ctx: ctx,
cancel: cancel,
initialUpdateDone: make(chan struct{}),
initDone: make(chan struct{}),
updateTrigger: make(chan chan error),
updateInterval: defaultUpdateInterval,
logger: logger,
Expand Down Expand Up @@ -322,18 +322,37 @@ func NewAPI(logger slog.Logger, options ...Option) *API {
}

// Init applies a final set of options to the API and then
// begins the watcherLoop and updaterLoop. This function
// must only be called once.
// closes initDone. This method can only be called once.
func (api *API) Init(opts ...Option) {
api.mu.Lock()
defer api.mu.Unlock()
if api.closed {
return
}
select {
case <-api.initDone:
return
default:
}
defer close(api.initDone)

for _, opt := range opts {
opt(api)
}
}

// Start starts the API by initializing the watcher and updater loops.
// This method calls Init, if it is desired to apply options after
// the API has been created, it should be done by calling Init before
// Start. This method must only be called once.
func (api *API) Start() {
api.Init()

api.mu.Lock()
defer api.mu.Unlock()
if api.closed {
return
}

api.watcherDone = make(chan struct{})
api.updaterDone = make(chan struct{})
Expand Down Expand Up @@ -412,9 +431,6 @@ func (api *API) updaterLoop() {
} else {
api.logger.Debug(api.ctx, "initial containers update complete")
}
// Signal that the initial update attempt (successful or not) is done.
// Other services can wait on this if they need the first data to be available.
close(api.initialUpdateDone)

// We utilize a TickerFunc here instead of a regular Ticker so that
// we can guarantee execution of the updateContainers method after
Expand Down Expand Up @@ -474,7 +490,7 @@ func (api *API) UpdateSubAgentClient(client SubAgentClient) {
func (api *API) Routes() http.Handler {
r := chi.NewRouter()

ensureInitialUpdateDoneMW := func(next http.Handler) http.Handler {
ensureInitDoneMW := func(next http.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
select {
case <-api.ctx.Done():
Expand All @@ -485,9 +501,8 @@ func (api *API) Routes() http.Handler {
return
case <-r.Context().Done():
return
case <-api.initialUpdateDone:
// Initial update is done, we can start processing
// requests.
case <-api.initDone:
// API init is done, we can start processing requests.
}
next.ServeHTTP(rw, r)
})
Expand All @@ -496,7 +511,7 @@ func (api *API) Routes() http.Handler {
// For now, all endpoints require the initial update to be done.
// If we want to allow some endpoints to be available before
// the initial update, we can enable this per-route.
r.Use(ensureInitialUpdateDoneMW)
r.Use(ensureInitDoneMW)

r.Get("/", api.handleList)
// TODO(mafredri): Simplify this route as the previous /devcontainers
Expand Down
31 changes: 17 additions & 14 deletions agent/agentcontainers/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func TestAPI(t *testing.T) {
agentcontainers.WithContainerCLI(mLister),
agentcontainers.WithContainerLabelIncludeFilter("this.label.does.not.exist.ignore.devcontainers", "true"),
)
api.Init()
api.Start()
defer api.Close()
r.Mount("/", api.Routes())

Expand Down Expand Up @@ -627,7 +627,7 @@ func TestAPI(t *testing.T) {
agentcontainers.WithDevcontainers(tt.setupDevcontainers, nil),
)

api.Init()
api.Start()
defer api.Close()
r.Mount("/", api.Routes())

Expand Down Expand Up @@ -1068,7 +1068,7 @@ func TestAPI(t *testing.T) {
}

api := agentcontainers.NewAPI(logger, apiOptions...)
api.Init()
api.Start()
defer api.Close()

r.Mount("/", api.Routes())
Expand Down Expand Up @@ -1158,7 +1158,7 @@ func TestAPI(t *testing.T) {
[]codersdk.WorkspaceAgentScript{{LogSourceID: uuid.New(), ID: dc.ID}},
),
)
api.Init()
api.Start()
defer api.Close()

// Make sure the ticker function has been registered
Expand Down Expand Up @@ -1254,7 +1254,7 @@ func TestAPI(t *testing.T) {
agentcontainers.WithWatcher(fWatcher),
agentcontainers.WithClock(mClock),
)
api.Init()
api.Start()
defer api.Close()

r := chi.NewRouter()
Expand Down Expand Up @@ -1408,7 +1408,7 @@ func TestAPI(t *testing.T) {
agentcontainers.WithDevcontainerCLI(fakeDCCLI),
agentcontainers.WithManifestInfo("test-user", "test-workspace", "test-parent-agent"),
)
api.Init()
api.Start()
apiClose := func() {
closeOnce.Do(func() {
// Close before api.Close() defer to avoid deadlock after test.
Expand Down Expand Up @@ -1635,7 +1635,7 @@ func TestAPI(t *testing.T) {
agentcontainers.WithSubAgentClient(fakeSAC),
agentcontainers.WithDevcontainerCLI(&fakeDevcontainerCLI{}),
)
api.Init()
api.Start()
defer api.Close()

tickerTrap.MustWait(ctx).MustRelease(ctx)
Expand Down Expand Up @@ -1958,7 +1958,7 @@ func TestAPI(t *testing.T) {
agentcontainers.WithSubAgentURL("test-subagent-url"),
agentcontainers.WithWatcher(watcher.NewNoop()),
)
api.Init()
api.Start()
defer api.Close()

// Close before api.Close() defer to avoid deadlock after test.
Expand Down Expand Up @@ -2052,7 +2052,7 @@ func TestAPI(t *testing.T) {
agentcontainers.WithSubAgentURL("test-subagent-url"),
agentcontainers.WithWatcher(watcher.NewNoop()),
)
api.Init()
api.Start()
defer api.Close()

// Close before api.Close() defer to avoid deadlock after test.
Expand Down Expand Up @@ -2158,7 +2158,7 @@ func TestAPI(t *testing.T) {
agentcontainers.WithWatcher(watcher.NewNoop()),
agentcontainers.WithManifestInfo("test-user", "test-workspace", "test-parent-agent"),
)
api.Init()
api.Start()
defer api.Close()

// Close before api.Close() defer to avoid deadlock after test.
Expand Down Expand Up @@ -2228,7 +2228,7 @@ func TestAPI(t *testing.T) {
agentcontainers.WithExecer(fakeExec),
agentcontainers.WithCommandEnv(commandEnv),
)
api.Init()
api.Start()
defer api.Close()

// Call RefreshContainers directly to trigger CommandEnv usage.
Expand Down Expand Up @@ -2318,13 +2318,16 @@ func TestAPI(t *testing.T) {
agentcontainers.WithWatcher(fWatcher),
agentcontainers.WithClock(mClock),
)
api.Init()
api.Start()
defer func() {
close(fakeSAC.createErrC)
close(fakeSAC.deleteErrC)
api.Close()
}()

err := api.RefreshContainers(ctx)
require.NoError(t, err, "RefreshContainers should not error")

r := chi.NewRouter()
r.Mount("/", api.Routes())

Expand All @@ -2335,7 +2338,7 @@ func TestAPI(t *testing.T) {
require.Equal(t, http.StatusOK, rec.Code)

var response codersdk.WorkspaceAgentListContainersResponse
err := json.NewDecoder(rec.Body).Decode(&response)
err = json.NewDecoder(rec.Body).Decode(&response)
require.NoError(t, err)

assert.Empty(t, response.Devcontainers, "ignored devcontainer should not be in response when ignore=true")
Expand Down Expand Up @@ -2519,7 +2522,7 @@ func TestSubAgentCreationWithNameRetry(t *testing.T) {
agentcontainers.WithSubAgentClient(fSAC),
agentcontainers.WithWatcher(watcher.NewNoop()),
)
api.Init()
api.Start()
defer api.Close()

tickerTrap.MustWait(ctx).MustRelease(ctx)
Expand Down
Loading
0