8000 chore: unify activity into workspaceapps.StatsCollector by f0ssel · Pull Request #13355 · coder/coder · GitHub
[go: up one dir, main page]

Skip to content

chore: unify activity into workspaceapps.StatsCollector #13355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

8000
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
save
  • Loading branch information
f0ssel committed May 28, 2024
commit 4fc5e470113dffabafc485852b3e5e8fb46c2a7f
2 changes: 1 addition & 1 deletion coderd/batchstats/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
)

// Batcher holds a buffer of agent stats and periodically flushes them to
// its configured store. It also updates the workspace's last used time.
// its configured store.
type Batcher struct {
store database.Store
log slog.Logger
Expand Down
3 changes: 2 additions & 1 deletion coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,8 @@ type API struct {
healthCheckGroup *singleflight.Group[string, *healthsdk.HealthcheckReport]
healthCheckCache atomic.Pointer[healthsdk.HealthcheckReport]

statsBatcher *batchstats.Batcher
statsBatcher *batchstats.Batcher
statsCollector workspaceapps.StatsCollector

Acquirer *provisionerdserver.Acquirer
// dbRolluper rolls up template usage stats from raw agent and app
Expand Down
49 changes: 6 additions & 43 deletions coderd/workspaceagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/coder/coder/v2/coderd/httpmw"
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/rbac/policy"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
Expand Down Expand Up @@ -1167,35 +1167,6 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques
slog.F("payload", req),
)

if req.ConnectionCount > 0 {
var nextAutostart time.Time
if workspace.AutostartSchedule.String != "" {
templateSchedule, err := (*(api.TemplateScheduleStore.Load())).Get(ctx, api.Database, workspace.TemplateID)
// If the template schedule fails to load, just default to bumping without the next transition and log it.
if err != nil {
// There's nothing we can do if the query was canceled, the
// client most likely went away so we just return an internal
// server error.
if database.IsQueryCanceledError(err) {
httpapi.InternalServerError(rw, err)
return
}
api.Logger.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min",
slog.F("workspace_id", workspace.ID),
slog.F("template_id", workspace.TemplateID),
slog.Error(err),
)
} else {
next, allowed := schedule.NextAutostart(time.Now(), workspace.AutostartSchedule.String, templateSchedule)
if allowed {
nextAutostart = next
}
}
}
agentapi.ActivityBumpWorkspace(ctx, api.Logger.Named("activity_bump"), api.Database, workspace.ID, nextAutostart)
}

now := dbtime.Now()
protoStats := &agentproto.Stats{
ConnectionsByProto: req.ConnectionsByProto,
ConnectionCount: req.ConnectionCount,
Expand Down Expand Up @@ -1242,19 +1213,6 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques
}
return nil
})
if req.SessionCount() > 0 {
errGroup.Go(func() error {
// nolint:gocritic // (#13146) Will be moved soon as part of refactor.
err := api.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{
ID: workspace.ID,
LastUsedAt: now,
})
if err != nil {
return xerrors.Errorf("can't update workspace LastUsedAt: %w", err)
}
return nil
})
}
if api.Options.UpdateAgentMetrics != nil {
errGroup.Go(func() error {
user, err := api.Database.GetUserByID(ctx, workspace.OwnerID)
Expand All @@ -1277,6 +1235,11 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques
return
}

api.statsCollector.CollectAndFlush(ctx, workspaceapps.StatsReport{
WorkspaceID: workspace.ID,
// TODO: fill out
})

httpapi.Write(ctx, rw, http.StatusOK, agentsdk.StatsResponse{
ReportInterval: api.AgentStatsRefreshInterval,
})
Expand Down
34 changes: 29 additions & 5 deletions coderd/workspaceapps/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"cdr.dev/slog"

agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbtime"
Expand Down Expand Up @@ -236,7 +237,10 @@ type StatsCollectorOptions struct {
// RollupWindow is the window size for rolling up stats, session shorter
// than this will be rolled up and longer than this will be tracked
// individually.
RollupWindow time.Duration
RollupWindow time.Duration
DB database.Store
Pubsub pubsub.Pubsub
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]

// Options for tests.
Flush <-chan chan<- struct{}
Expand Down Expand Up @@ -295,11 +299,31 @@ func (sc *StatsCollector) Collect(report StatsReport) {
sc.opts.Logger.Debug(sc.ctx, "collected workspace app stats", slog.F("report", report))
}

func (sc *StatsCollector) CollectAndFlush(ctx context.Context, report StatsReport) error {
sc.Collect(report)
err := sc.flush(ctx)
func (sc *StatsCollector) CollectAgentStat(ctx context.Context, now time.Time, agentID uuid.UUID, workspace database.Workspace, st *agentproto.Stats) error {
var nextAutostart time.Time
if workspace.AutostartSchedule.String != "" {
templateSchedule, err := (*(sc.opts.TemplateScheduleStore.Load())).Get(ctx, sc.opts.DB, workspace.TemplateID)
// If the template schedule fails to load, just default to bumping
// without the next transition and log it.
if err != nil {
sc.opts.Logger.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min",
slog.F("workspace_id", workspace.ID),
slog.F("template_id", workspace.TemplateID),
slog.Error(err),
)
} else {
next, allowed := schedule.NextAutostart(dbtime.Now(), workspace.AutostartSchedule.String, templateSchedule)
if allowed {
nextAutostart = next
}
}
}
ActivityBumpWorkspace(ctx, sc.opts.Logger.Named("activity_bump"), sc.opts.DB, workspace.ID, nextAutostart)

err := sc.opts.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspace.ID), []byte{})
if err != nil {
return xerrors.Errorf("flushing collector: %w", err)
sc.opts.Logger.Warn(ctx, "failed to publish workspace agent stats",
slog.F("workspace_id", workspace.ID), slog.Error(err))
}

return nil
Expand Down
0