8000 fix!: remove startup logs eof for streaming (#8528) · coder/coder@98164f6 · GitHub
[go: up one dir, main page]

Skip to content

Commit 98164f6

Browse files
kylecarbsmafredri
andauthored
fix!: remove startup logs eof for streaming (#8528)
* fix: remove startup logs eof for streaming We have external utilities like logstream-kube that may send logs after an agent shuts down unexpectedly to report additional information. In a recent change we stopped accepting these logs, which broke these utilities. In the future we'll rename startup logs to agent logs or something more generalized so this is less confusing in the future. * fix(cli/cliui): handle never ending startup log stream in Agent --------- Co-authored-by: Mathias Fredriksson <mafredri@gmail.com>
1 parent 5826588 commit 98164f6

File tree

5 files changed

+82
-239
lines changed
  • codersdk/agentsdk
  • 5 files changed

    +82
    -239
    lines changed

    cli/cliui/agent.go

    Lines changed: 23 additions & 5 deletions
    Original file line numberDiff line numberDiff line change
    @@ -137,26 +137,44 @@ func Agent(ctx context.Context, writer io.Writer, agentID uuid.UUID, opts AgentO
    137137
    }
    138138
    defer logsCloser.Close()
    139139

    140+
    var lastLog codersdk.WorkspaceAgentStartupLog
    141+
    fetchedAgentWhileFollowing := fetchedAgent
    142+
    if !follow {
    143+
    fetchedAgentWhileFollowing = nil
    144+
    }
    140145
    for {
    141146
    // This select is essentially and inline `fetch()`.
    142147
    select {
    143148
    case <-ctx.Done():
    144149
    return ctx.Err()
    145-
    case f := <-fetchedAgent:
    150+
    case f := <-fetchedAgentWhileFollowing:
    146151
    if f.err != nil {
    147152
    return xerrors.Errorf("fetch: %w", f.err)
    148153
    }
    149-
    // We could handle changes in the agent status here, like
    150-
    // if the agent becomes disconnected, we may want to stop.
    151-
    // But for now, we'll just keep going, hopefully the agent
    152-
    // will reconnect and update its status.
    153154
    agent = f.agent
    155+
    156+
    // If the agent is no longer starting, stop following
    157+
    // logs because FetchLogs will keep streaming forever.
    158+
    // We do one last non-follow request to ensure we have
    159+
    // fetched all logs.
    160+
    if !agent.LifecycleState.Starting() {
    161+
    _ = logsCloser.Close()
    162+
    fetchedAgentWhileFollowing = nil
    163+
    164+
    logStream, logsCloser, err = opts.FetchLogs(ctx, agent.ID, lastLog.ID, false)
    165+
    if err != nil {
    166+
    return xerrors.Errorf("fetch workspace agent startup logs: %w", err)
    167+
    }
    168+
    // Logs are already primed, so we can call close.
    169+
    _ = logsCloser.Close()
    170+
    }
    154171
    case logs, ok := <-logStream:
    155172
    if !ok {
    156173
    return nil
    157174
    }
    158175
    for _, log := range logs {
    159176
    sw.Log(log.CreatedAt, log.Level, log.Output)
    177+
    lastLog = log
    160178
    }
    161179
    }
    162180
    }

    cli/cliui/agent_test.go

    Lines changed: 15 additions & 12 deletions
    Original file line numberDiff line numberDiff line change
    @@ -46,7 +46,6 @@ func TestAgent(t *testing.T) {
    4646
    func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
    4747
    agent.Status = codersdk.WorkspaceAgentConnected
    4848
    agent.FirstConnectedAt = ptr.Ref(time.Now())
    49-
    close(logs)
    5049
    return nil
    5150
    },
    5251
    },
    @@ -79,7 +78,6 @@ func TestAgent(t *testing.T) {
    7978
    agent.FirstConnectedAt = ptr.Ref(time.Now())
    8079
    agent.LifecycleState = codersdk.WorkspaceAgentLifecycleReady
    8180
    agent.ReadyAt = ptr.Ref(time.Now())
    82-
    close(logs)
    8381
    return nil
    8482
    },
    8583
    },
    @@ -113,10 +111,6 @@ func TestAgent(t *testing.T) {
    113111
    agent.LastConnectedAt = ptr.Ref(time.Now())
    114112
    return nil
    115113
    },
    116-
    func(_ context.Context, _ *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
    117-
    close(logs)
    118-
    return nil
    119-
    },
    120114
    },
    121115
    want: []string{
    122116
    "⧗ The workspace agent lost connection",
    @@ -154,7 +148,6 @@ func TestAgent(t *testing.T) {
    154148
    Output: "Bye now",
    155149
    },
    156150
    }
    157-
    close(logs)
    158151
    return nil
    159152
    },
    160153
    },
    @@ -184,7 +177,6 @@ func TestAgent(t *testing.T) {
    184177
    Output: "Hello world",
    185178
    },
    186179
    }
    187-
    close(logs)
    188180
    return nil
    189181
    },
    190182
    },
    @@ -205,7 +197,6 @@ func TestAgent(t *testing.T) {
    205197
    func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
    206198
    agent.Status = codersdk.WorkspaceAgentDisconnected
    207199
    agent.LifecycleState = codersdk.WorkspaceAgentLifecycleOff
    208-
    close(logs)
    741A 209200
    return nil
    210201
    },
    211202
    },
    @@ -234,7 +225,6 @@ func TestAgent(t *testing.T) {
    234225
    func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error {
    235226
    agent.ReadyAt = ptr.Ref(time.Now())
    236227
    agent.LifecycleState = codersdk.WorkspaceAgentLifecycleShuttingDown
    237-
    close(logs)
    238228
    return nil
    239229
    },
    240230
    },
    @@ -316,8 +306,21 @@ func TestAgent(t *testing.T) {
    316306
    }
    317307
    return agent, err
    318308
    }
    319-
    tc.opts.FetchLogs = func(_ context.Context, _ uuid.UUID, _ int64, _ bool) (<-chan []codersdk.WorkspaceAgentStartupLog, io.Closer, error) {
    320-
    return logs, closeFunc(func() error { return nil }), nil
    309+
    tc.opts.FetchLogs = func(ctx context.Context, _ uuid.UUID, _ int64, follow bool) (<-chan []codersdk.WorkspaceAgentStartupLog, io.Closer, error) {
    310+
    if follow {
    311+
    return logs, closeFunc(func() error { return nil }), nil
    312+
    }
    313+
    314+
    fetchLogs := make(chan []codersdk.WorkspaceAgentStartupLog, 1)
    315+
    select {
    316+
    case <-ctx.Done():
    317+
    return nil, nil, ctx.Err()
    318+
    case l := <-logs:
    319+
    fetchLogs <- l
    320+
    default:
    321+
    }
    322+
    close(fetchLogs)
    323+
    return fetchLogs, closeFunc(func() error { return nil }), nil
    321324
    }
    322325
    err := cliui.Agent(inv.Context(), &buf, uuid.Nil, tc.opts)
    323326
    return err

    coderd/workspaceagents.go

    Lines changed: 44 additions & 97 deletions
    Original file line numberDiff line numberDiff line change
    @@ -280,81 +280,61 @@ func (api *API) patchWorkspaceAgentStartupLogs(rw http.ResponseWriter, r *http.R
    280280
    level = append(level, parsedLevel)
    281281
    }
    282282

    283-
    var logs []database.WorkspaceAgentStartupLog
    284-
    // Ensure logs are not written after script ended.
    285-
    scriptEndedError := xerrors.New("startup script has ended")
    286-
    err := api.Database.InTx(func(db database.Store) error {
    287-
    state, err := db.GetWorkspaceAgentLifecycleStateByID(ctx, workspaceAgent.ID)
    288-
    if err != nil {
    289-
    return xerrors.Errorf("workspace agent startup script status: %w", err)
    283+
    logs, err := api.Database.InsertWorkspaceAgentStartupLogs(ctx, database.InsertWorkspaceAgentStartupLogsParams{
    284+
    AgentID: workspaceAgent.ID,
    285+
    CreatedAt: createdAt,
    286+
    Output: output,
    287+
    Level: level,
    288+
    OutputLength: int32(outputLength),
    289+
    })
    290+
    if err != nil {
    291+
    if !database.IsStartupLogsLimitError(err) {
    292+
    httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
    293+
    Message: "Failed to upload startup logs",
    294+
    Detail: err.Error(),
    295+
    })
    296+
    return
    290297
    }
    291-
    292-
    if state.ReadyAt.Valid {
    293-
    // The agent startup script has already ended, so we don't want to
    294-
    // process any more logs.
    295-
    return scriptEndedError
    298+
    if workspaceAgent.StartupLogsOverflowed {
    299+
    httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{
    300+
    Message: "Startup logs limit exceeded",
    301+
    Detail: err.Error(),
    302+
    })
    303+
    return
    296304
    }
    297-
    298-
    logs, err = db.InsertWorkspaceAgentStartupLogs(ctx, database.InsertWorkspaceAgentStartupLogsParams{
    299-
    AgentID: workspaceAgent.ID,
    300-
    CreatedAt: createdAt,
    301-
    Output: output,
    302-
    Level: level,
    303-
    OutputLength: int32(outputLength),
    305+
    err := api.Database.UpdateWorkspaceAgentStartupLogOverflowByID(ctx, database.UpdateWorkspaceAgentStartupLogOverflowByIDParams{
    306+
    ID: workspaceAgent.ID,
    307+
    StartupLogsOverflowed: true,
    304308
    })
    305-
    return err
    306-
    }, nil)
    307-
    if err != nil {
    308-
    if errors.Is(err, scriptEndedError) {
    309-
    httpapi.Write(ctx, rw, http.StatusConflict, codersdk.Response{
    310-
    Message: "Failed to upload logs, startup script has already ended.",
    309+
    if err != nil {
    310+
    // We don't want to return here, because the agent will retry
    311+
    // on failure and this isn't a huge deal. The overflow state
    312+
    // is just a hint to the user that the logs are incomplete.
    313+
    api.Logger.Warn(ctx, "failed to update workspace agent startup log overflow", slog.Error(err))
    314+
    }
    315+
    316+
    resource, err := api.Database.GetWorkspaceResourceByID(ctx, workspaceAgent.ResourceID)
    317+
    if err != nil {
    318+
    httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
    319+
    Message: "Failed to get workspace resource.",
    311320
    Detail: err.Error(),
    312321
    })
    313322
    return
    314323
    }
    315-
    if database.IsStartupLogsLimitError(err) {
    316-
    if !workspaceAgent.StartupLogsOverflowed {
    317-
    err := api.Database.UpdateWorkspaceAgentStartupLogOverflowByID(ctx, database.UpdateWorkspaceAgentStartupLogOverflowByIDParams{
    318-
    ID: workspaceAgent.ID,
    319-
    StartupLogsOverflowed: true,
    320-
    })
    321-
    if err != nil {
    322-
    // We don't want to return here, because the agent will retry
    323-
    // on failure and this isn't a huge deal. The overflow state
    324-
    // is just a hint to the user that the logs are incomplete.
    325-
    api.Logger.Warn(ctx, "failed to update workspace agent startup log overflow", slog.Error(err))
    326-
    }
    327324

    328-
    resource, err := api.Database.GetWorkspaceResourceByID(ctx, workspaceAgent.ResourceID)
    329-
    if err != nil {
    330-
    httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
    331-
    Message: "Failed to get workspace resource.",
    332-
    Detail: err.Error(),
    333-
    })
    334-
    return
    335-
    }
    336-
    337-
    build, err := api.Database.GetWorkspaceBuildByJobID(ctx, resource.JobID)
    338-
    if err != nil {
    339-
    httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
    340-
    Message: "Internal error fetching workspace build job.",
    341-
    Detail: err.Error(),
    342-
    })
    343-
    return
    344-
    }
    345-
    346-
    api.publishWorkspaceUpdate(ctx, build.WorkspaceID)
    347-
    }
    348-
    349-
    httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{
    350-
    Message: "Startup logs limit exceeded",
    325+
    build, err := api.Database.GetWorkspaceBuildByJobID(ctx, resource.JobID)
    326+
    if err != nil {
    327+
    httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
    328+
    Message: "Internal error fetching workspace build job.",
    351329
    Detail: err.Error(),
    352330
    })
    353331
    return
    354332
    }
    355-
    httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
    356-
    Message: "Failed to upload startup logs",
    357-
    Detail: err.Error(),
    333+
    334+
    api.publishWorkspaceUpdate(ctx, build.WorkspaceID)
    335+
    336+
    httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{
    337+
    Message: "Startup logs limit exceeded",
    358338
    })
    359339
    return
    360340
    }
    @@ -497,18 +477,6 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
    497477
    return
    498478
    }
    499479

    500-
    if workspaceAgent.ReadyAt.Valid {
    501-
    // Fast path, the startup script has finished running, so we can close
    502-
    // the connection.
    503-
    return
    504-
    }
    505-
    if !codersdk.WorkspaceAgentLifecycle(workspaceAgent.LifecycleState).Starting() {
    506-
    // Backwards compatibility: Avoid waiting forever in case this agent is
    507-
    // older than the current release and has already reported the ready
    508-
    // state.
    509-
    return
    510-
    }
    511-
    512480
    lastSentLogID := after
    513481
    if len(logs) > 0 {
    514482
    lastSentLogID = logs[len(logs)-1].ID
    @@ -543,11 +511,9 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
    543511
    t := time.NewTicker(recheckInterval)
    544512
    defer t.Stop()
    545513

    546-
    var state database.GetWorkspaceAgentLifecycleStateByIDRow
    547514
    go func() {
    548515
    defer close(bufferedLogs)
    549516

    550-
    var err error
    551517
    for {
    552518
    select {
    553519
    case <-ctx.Done():
    @@ -557,17 +523,6 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
    557523
    t.Reset(recheckInterval)
    558524
    }
    559525

    560-
    if !state.ReadyAt.Valid {
    561-
    state, err = api.Database.GetWorkspaceAgentLifecycleStateByID(ctx, workspaceAgent.ID)
    562-
    if err != nil {
    563-
    if xerrors.Is(err, context.Canceled) {
    564-
    return
    565-
    }
    566-
    logger.Warn(ctx, "failed to get workspace agent lifecycle state", slog.Error(err))
    567-
    continue
    568-
    }
    569-
    }
    570-
    571526
    logs, err := api.Database.GetWorkspaceAgentStartupLogsAfter(ctx, database.GetWorkspaceAgentStartupLogsAfterParams{
    572527
    AgentID: workspaceAgent.ID,
    573528
    CreatedAfter: lastSentLogID,
    @@ -580,9 +535,7 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
    580535
    continue
    581536
    }
    582537
    if len(logs) == 0 {
    583-
    if state.ReadyAt.Valid {
    584-
    return
    585-
    }
    538+
    // Just keep listening - more logs might come in the future!
    586539
    continue
    587540
    }
    588541

    @@ -1689,12 +1642,6 @@ func (api *API) workspaceAgentReportLifecycle(rw http.ResponseWriter, r *http.Re
    16891642
    return
    16901643
    }
    16911644

    1692-
    if readyAt.Valid {
    1693-
    api.publishWorkspaceAgentStartupLogsUpdate(ctx, workspaceAgent.ID, agentsdk.StartupLogsNotifyMessage{
    1694-
    EndOfLogs: true,
    1695-
    })
    1696-
    }
    1697-
    16981645
    api.publishWorkspaceUpdate(ctx, workspace.ID)
    16991646

    17001647
    httpapi.Write(ctx, rw, http.StatusNoContent, nil)

    0 commit comments

    Comments
     (0)
    0