8000 fix: stop incrementing activity on empty agent stats by f0ssel · Pull Request #15204 · coder/coder · GitHub
[go: up one dir, main page]

Skip to content

fix: stop incrementing activity on empty agent stats #15204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 44 additions & 16 deletions coderd/agentapi/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func TestUpdateStates(t *testing.T) {
}
batcher = &workspacestatstest.StatsBatcher{}
updateAgentMetricsFnCalled = false
tickCh = make(chan time.Time)
flushCh = make(chan int, 1)
wut = workspacestats.NewTracker(dbM,
workspacestats.TrackerWithTickFlush(tickCh, flushCh),
)

req = &agentproto.UpdateStatsRequest{
Stats: &agentproto.Stats{
Expand Down Expand Up @@ -108,6 +113,7 @@ func TestUpdateStates(t *testing.T) {
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
UsageTracker: wut,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
Expand All @@ -125,28 +131,29 @@ func TestUpdateStates(t *testing.T) {
return now
},
}
defer wut.Close()

// Workspace gets fetched.
dbM.EXPECT().GetWorkspaceByAgentID(gomock.Any(), agent.ID).Return(database.GetWorkspaceByAgentIDRow{
Workspace: workspace,
TemplateName: template.Name,
}, nil)

// User gets fetched to hit the UpdateAgentMetricsFn.
dbM.EXPECT().GetUserByID(gomock.Any(), user.ID).Return(user, nil)

// We expect an activity bump because ConnectionCount > 0.
dbM.EXPECT().ActivityBumpWorkspace(gomock.Any(), database.ActivityBumpWorkspaceParams{
WorkspaceID: workspace.ID,
NextAutostart: time.Time{}.UTC(),
}).Return(nil)

// Workspace last used at gets bumped.
dbM.EXPECT().UpdateWorkspaceLastUsedAt(gomock.Any(), database.UpdateWorkspaceLastUsedAtParams{
ID: workspace.ID,
dbM.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{
IDs: []uuid.UUID{workspace.ID},
LastUsedAt: now,
}).Return(nil)

// User gets fetched to hit the UpdateAgentMetricsFn.
dbM.EXPECT().GetUserByID(gomock.Any(), user.ID).Return(user, nil)

// Ensure that pubsub notifications are sent.
notifyDescription := make(chan []byte)
ps.Subscribe(codersdk.WorkspaceNotifyChannel(workspace.ID), func(_ context.Context, description []byte) {
Expand All @@ -161,6 +168,10 @@ func TestUpdateStates(t *testing.T) {
ReportInterval: durationpb.New(10 * time.Second),
}, resp)

tickCh <- now
count := <-flushCh
require.Equal(t, 1, count, "expected one flush with one id")

batcher.Mu.Lock()
defer batcher.Mu.Unlock()
require.Equal(t, int64(1), batcher.Called)
Expand Down Expand Up @@ -213,6 +224,7 @@ func TestUpdateStates(t *testing.T) {
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
UsageTracker: workspacestats.NewTracker(dbM),
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
// Ignored when nil.
Expand All @@ -230,12 +242,6 @@ func TestUpdateStates(t *testing.T) {
TemplateName: template.Name,
}, nil)

// Workspace last used at gets bumped.
dbM.EXPECT().UpdateWorkspaceLastUsedAt(gomock.Any(), database.UpdateWorkspaceLastUsedAtParams{
ID: workspace.ID,
LastUsedAt: now,
}).Return(nil)

_, err := api.UpdateStats(context.Background(), req)
require.NoError(t, err)
})
Expand Down Expand Up @@ -311,6 +317,11 @@ func TestUpdateStates(t *testing.T) {
}
batcher = &workspacestatstest.StatsBatcher{}
updateAgentMetricsFnCalled = false
tickCh = make(chan time.Time)
flushCh = make(chan int, 1)
wut = workspacestats.NewTracker(dbM,
workspacestats.TrackerWithTickFlush(tickCh, flushCh),
)

req = &agentproto.UpdateStatsRequest{
Stats: &agentproto.Stats{
Expand All @@ -330,6 +341,7 @@ func TestUpdateStates(t *testing.T) {
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
UsageTracker: wut,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
Expand All @@ -348,6 +360,7 @@ func TestUpdateStates(t *testing.T) {
return now
},
}
defer wut.Close()

// Workspace gets fetched.
dbM.EXPECT().GetWorkspaceByAgentID(gomock.Any(), agent.ID).Return(database.GetWorkspaceByAgentIDRow{
Expand All @@ -363,9 +376,9 @@ func TestUpdateStates(t *testing.T) {
}).Return(nil)

// Workspace last used at gets bumped.
dbM.EXPECT().UpdateWorkspaceLastUsedAt(gomock.Any(), database.UpdateWorkspaceLastUsedAtParams{
ID: workspace.ID,
LastUsedAt: now,
dbM.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{
IDs: []uuid.UUID{workspace.ID},
LastUsedAt: now.UTC(),
}).Return(nil)

// User gets fetched to hit the UpdateAgentMetricsFn.
Expand All @@ -377,6 +390,10 @@ func TestUpdateStates(t *testing.T) {
ReportInterval: durationpb.New(15 * time.Second),
}, resp)

tickCh <- now
count := <-flushCh
require.Equal(t, 1, count, "expected one flush with one id")

require.True(t, updateAgentMetricsFnCalled)
})

Expand All @@ -400,6 +417,11 @@ func TestUpdateStates(t *testing.T) {
}
batcher = &workspacestatstest.StatsBatcher{}
updateAgentMetricsFnCalled = false
tickCh = make(chan time.Time)
flushCh = make(chan int, 1)
wut = workspacestats.NewTracker(dbM,
workspacestats.TrackerWithTickFlush(tickCh, flushCh),
)

req = &agentproto.UpdateStatsRequest{
Stats: &agentproto.Stats{
Expand Down Expand Up @@ -430,6 +452,7 @@ func TestUpdateStates(t *testing.T) {
},
}
)
defer wut.Close()
api := agentapi.StatsAPI{
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
return agent, nil
Expand All @@ -439,6 +462,7 @@ func TestUpdateStates(t *testing.T) {
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
UsageTracker: wut,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
Expand Down Expand Up @@ -473,8 +497,8 @@ func TestUpdateStates(t *testing.T) {
}).Return(nil)

// Workspace last used at gets bumped.
dbM.EXPECT().UpdateWorkspaceLastUsedAt(gomock.Any(), database.UpdateWorkspaceLastUsedAtParams{
ID: workspace.ID,
dbM.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{
IDs: []uuid.UUID{workspace.ID},
LastUsedAt: now,
}).Return(nil)

Expand All @@ -495,6 +519,10 @@ func TestUpdateStates(t *testing.T) {
ReportInterval: durationpb.New(10 * time.Second),
}, resp)

tickCh <- now
count := <-flushCh
require.Equal(t, 1, count, "expected one flush with one id")

batcher.Mu.Lock()
defer batcher.Mu.Unlock()
require.EqualValues(t, 1, batcher.Called)
Expand Down
23 changes: 20 additions & 3 deletions coderd/httpapi/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package httpapi

import (
"context"
"errors"
"time"

"golang.org/x/xerrors"
"nhooyr.io/websocket"

"cdr.dev/slog"
Expand Down Expand Up @@ -31,7 +33,8 @@ func Heartbeat(ctx context.Context, conn *websocket.Conn) {
// Heartbeat loops to ping a WebSocket to keep it alive. It calls `exit` on ping
// failure.
func HeartbeatClose(ctx context.Context, logger slog.Logger, exit func(), conn *websocket.Conn) {
ticker := time.NewTicker(15 * time.Second)
interval := 15 * A3E2 time.Second
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
Expand All @@ -40,12 +43,26 @@ func HeartbeatClose(ctx context.Context, logger slog.Logger, exit func(), conn *
return
case <-ticker.C:
}
err := conn.Ping(ctx)
err := pingWithTimeout(ctx, conn, interval)
if err != nil {
// context.DeadlineExceeded is expected when the client disconnects without sending a close frame
if !errors.Is(err, context.DeadlineExceeded) {
logger.Error(ctx, "failed to heartbeat ping", slog.Error(err))
}
_ = conn.Close(websocket.StatusGoingAway, "Ping failed")
logger.Info(ctx, "failed to heartbeat ping", slog.Error(err))
exit()
return
}
}
}

func pingWithTimeout(ctx context.Context, conn *websocket.Conn, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
err := conn.Ping(ctx)
if err != nil {
return xerrors.Errorf("failed to ping: %w", err)
}

return nil
}
6 changes: 2 additions & 4 deletions coderd/insights_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,14 +700,13 @@ func TestTemplateInsights_Golden(t *testing.T) {
connectionCount = 0
}
for createdAt.Before(stat.endedAt) {
err = batcher.Add(createdAt, workspace.agentID, workspace.template.id, workspace.user.(*testUser).sdk.ID, workspace.id, &agentproto.Stats{
batcher.Add(createdAt, workspace.agentID, workspace.template.id, workspace.user.(*testUser).sdk.ID, workspace.id, &agentproto.Stats{
ConnectionCount: connectionCount,
SessionCountVscode: stat.sessionCountVSCode,
SessionCountJetbrains: stat.sessionCountJetBrains,
SessionCountReconnectingPty: stat.sessionCountReconnectingPTY,
SessionCountSsh: stat.sessionCountSSH,
}, false)
require.NoError(t, err, "want no error inserting agent stats")
createdAt = createdAt.Add(30 * time.Second)
}
}
Expand Down Expand Up @@ -1599,14 +1598,13 @@ func TestUserActivityInsights_Golden(t *testing.T) {
connectionCount = 0
}
for createdAt.Before(stat.endedAt) {
err = batcher.Add(createdAt, workspace.agentID, workspace.template.id, workspace.user.(*testUser).sdk.ID, workspace.id, &agentproto.Stats{
batcher.Add(createdAt, workspace.agentID, workspace.template.id, workspace.user.(*testUser).sdk.ID, workspace.id, &agentproto.Stats{
ConnectionCount: connectionCount,
SessionCountVscode: stat.sessionCountVSCode,
SessionCountJetbrains: stat.sessionCountJetBrains,
SessionCountReconnectingPty: stat.sessionCountReconnectingPTY,
SessionCountSsh: stat.sessionCountSSH,
}, false)
require.NoError(t, err, "want no error inserting agent stats")
createdAt = createdAt.Add(30 * time.Second)
}
}
Expand Down
13 changes: 12 additions & 1 deletion coderd/workspaceagentsrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package coderd_test
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/coder/coder/v2/coderd/coderdtest"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbfake"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/provisionersdk/proto"
"github.com/coder/coder/v2/testutil"
Expand All @@ -20,7 +22,12 @@ import (
func TestWorkspaceAgentReportStats(t *testing.T) {
t.Parallel()

client, db := coderdtest.NewWithDatabase(t, nil)
tickCh := make(chan time.Time)
flushCh := make(chan int, 1)
client, db := coderdtest.NewWithDatabase(t, &coderdtest.Options{
WorkspaceUsageTrackerFlush: flushCh,
WorkspaceUsageTrackerTick: tickCh,
})
user := coderdtest.CreateFirstUser(t, client)
r := dbfake.WorkspaceBuild(t, db, database.Workspace{
OrganizationID: user.OrganizationID,
Expand Down Expand Up @@ -53,6 +60,10 @@ func TestWorkspaceAgentReportStats(t *testing.T) {
})
require.NoError(t, err)

tickCh <- dbtime.Now()
count := <-flushCh
require.Equal(t, 1, count, "expected one flush with one id")

newWorkspace, err := client.Workspace(context.Background(), r.Workspace.ID)
require.NoError(t, err)

Expand Down
7 changes: 3 additions & 4 deletions coderd/workspaceapps/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,6 @@ func (s *Server) proxyWorkspaceApp(rw http.ResponseWriter, r *http.Request, appT
tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx))

report := newStatsReportFromSignedToken(appToken)
s.collectStats(report)
defer func() {
// We must use defer here because ServeHTTP may panic.
report.SessionEndedAt = dbtime.Now()
Expand All @@ -614,7 +613,8 @@ func (s *Server) proxyWorkspaceApp(rw http.ResponseWriter, r *http.Request, appT
// @Success 101
// @Router /workspaceagents/{workspaceagent}/pty [get]
func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx, cancel := context.WithCancel(r.Context())
defer cancel()

s.websocketWaitMutex.Lock()
s.websocketWaitGroup.Add(1)
Expand Down Expand Up @@ -670,12 +670,11 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
})
return
}
go httpapi.HeartbeatClose(ctx, s.Logger, cancel, conn)

ctx, wsNetConn := WebsocketNetConn(ctx, conn, websocket.MessageBinary)
defer wsNetConn.Close() // Also closes conn.

go httpapi.Heartbeat(ctx, conn)

agentConn, release, err := s.AgentProvider.AgentConn(ctx, appToken.AgentID)
if err != nil {
log.Debug(ctx, "dial workspace agent", slog.Error(err))
Expand Down
5 changes: 2 additions & 3 deletions coderd/workspacestats/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
)

type Batcher interface {
Add(now time.Time, agentID uuid.UUID, templateID uuid.UUID, userID uuid.UUID, workspaceID uuid.UUID, st *agentproto.Stats, usage bool) error
Add(now time.Time, agentID uuid.UUID, templateID uuid.UUID, userID uuid.UUID, workspaceID uuid.UUID, st *agentproto.Stats, usage bool)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: This does make the whole interface more convenient to use 👍

}

// DBBatcher holds a buffer of agent stats and periodically flushes them to
Expand Down Expand Up @@ -139,7 +139,7 @@ func (b *DBBatcher) Add(
workspaceID uuid.UUID,
st *agentproto.Stats,
usage bool,
) error {
) {
b.mu.Lock()
defer b.mu.Unlock()

Expand Down Expand Up @@ -176,7 +176,6 @@ func (b *DBBatcher) Add(
b.flushLever <- struct{}{}
b.flushForced.Store(true)
}
return nil
}

// Run runs the batcher.
Expand Down
6 changes: 3 additions & 3 deletions coderd/workspacestats/batcher_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestBatchStats(t *testing.T) {
// Given: a single data point is added for workspace
t2 := t1.Add(time.Second)
t.Logf("inserting 1 stat")
require.NoError(t, b.Add(t2.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randStats(t), false))
b.Add(t2.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randStats(t), false)

// When: it becomes time to report stats
// Signal a tick and wait for a flush to complete.
Expand All @@ -87,9 +87,9 @@ func TestBatchStats(t *testing.T) {
t.Logf("inserting %d stats", defaultBufferSize)
for i := 0; i < defaultBufferSize; i++ {
if i%2 == 0 {
require.NoError(t, b.Add(t3.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randStats(t), false))
b.Add(t3.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randStats(t), false)
} else {
require.NoError(t, b.Add(t3.Add(time.Millisecond), deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randStats(t), false))
b.Add(t3.Add(time.Millisecond), deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randStats(t), false)
}
}
}()
Expand Down
Loading
Loading
Oops, something went wrong.
0