8000 feat: add WaitUntilEmpty to LogSender · coder/coder@2f52a67 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2f52a67

Browse files
committed
feat: add WaitUntilEmpty to LogSender
1 parent 900e32a commit 2f52a67

File tree

2 files changed

+65
-1
lines changed

2 files changed

+65
-1
lines changed

codersdk/agentsdk/logs.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,34 @@ func (l *LogSender) GetScriptLogger(logSourceID uuid.UUID) ScriptLogger {
487487
return ScriptLogger{srcID: logSourceID, sender: l}
488488
}
489489

490+
// WaitUntilEmpty waits until the LogSender's queues are empty or the given context expires.
491+
func (l *LogSender) WaitUntilEmpty(ctx context.Context) error {
492+
ctxDone := false
493+
nevermind := make(chan struct{})
494+
defer close(nevermind)
495+
go func() {
496+
select {
497+
case <-ctx.Done():
498+
l.L.Lock()
499+
defer l.L.Unlock()
500+
ctxDone = true
501+
l.Broadcast()
502+
return
503+
case <-nevermind:
504+
return
505+
}
506+
}()
507+
l.L.Lock()
508+
defer l.L.Unlock()
509+
for len(l.queues) != 0 && !ctxDone {
510+
l.Wait()
511+
}
512+
if len(l.queues) == 0 {
513+
return nil
514+
}
515+
return ctx.Err()
516+
}
517+
490518
type ScriptLogger struct {
491519
sender *LogSender
492520
srcID uuid.UUID

codersdk/agentsdk/logs_internal_test.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ func TestLogSender_Mainline(t *testing.T) {
5656
loopErr <- err
5757
}()
5858

59+
empty := make(chan error, 1)
60+
go func() {
61+
err := uut.WaitUntilEmpty(ctx)
62+
empty <- err
63+
}()
64+
5965
// since neither source has even been flushed, it should immediately Flush
6066
// both, although the order is not controlled
6167
var logReqs []*proto.BatchCreateLogsRequest
@@ -104,8 +110,11 @@ func TestLogSender_Mainline(t *testing.T) {
104110
require.Equal(t, proto.Log_DEBUG, req.Logs[0].GetLevel())
105111
require.Equal(t, t1, req.Logs[0].GetCreatedAt().AsTime())
106112

113+
err := testutil.RequireRecvCtx(ctx, t, empty)
114+
require.NoError(t, err)
115+
107116
cancel()
108-
err := testutil.RequireRecvCtx(testCtx, t, loopErr)
117+
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
109118
require.ErrorIs(t, err, context.Canceled)
110119

111120
// we can still enqueue more logs after SendLoop returns
@@ -363,6 +372,33 @@ func TestLogSender_SendError(t *testing.T) {
363372
uut.L.Unlock()
364373
}
365374

375+
func TestLogSender_WaitUntilEmpty_ContextExpired(t *testing.T) {
376+
t.Parallel()
377+
testCtx := testutil.Context(t, testutil.WaitShort)
378+
ctx, cancel := context.WithCancel(testCtx)
379+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
380+
uut := NewLogSender(logger)
381+
382+
t0 := dbtime.Now()
383+
384+
ls1 := uuid.UUID{0x11}
385+
uut.Enqueue(ls1, Log{
386+
CreatedAt: t0,
387+
Output: "test log 0, src 1",
388+
Level: codersdk.LogLevelInfo,
389+
})
390+
391+
empty := make(chan error, 1)
392+
go func() {
393+
err := uut.WaitUntilEmpty(ctx)
394+
empty <- err
395+
}()
396+
397+
cancel()
398+
err := testutil.RequireRecvCtx(testCtx, t, empty)
399+
require.ErrorIs(t, err, context.Canceled)
400+
}
401+
366402
type fakeLogDest struct {
367403
reqs chan *proto.BatchCreateLogsRequest
368404
resps chan *proto.BatchCreateLogsResponse

0 commit comments

Comments
 (0)
0