8000 feat: add logSender for sending logs on agent v2 API by spikecurtis · Pull Request #12046 · coder/coder · GitHub
[go: up one dir, main page]

Skip to content

feat: add logSender for sending logs on agent v2 API #12046

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: add logSender for sending logs on agent v2 API
  • Loading branch information
spikecurtis committed Feb 15, 2024
commit ce5ed91c76d7337d29ee2f8645251e2fff6d1922
174 changes: 174 additions & 0 deletions agent/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package agent

import (
"context"
"sync"
"time"

"github.com/google/uuid"
"golang.org/x/xerrors"

"cdr.dev/slog"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/codersdk/agentsdk"
)

const flushInterval = time.Second

type logQueue struct {
logs []*proto.Log
flushRequested bool
lastFlush time.Time
}

// logSender is a subcomponent of agent that handles enqueuing logs and then sending them over the
// agent API. Things that need to log call enqueue and flush. When the agent API becomes available,
// the agent calls sendLoop to send pending logs.
type logSender struct {
*sync.Cond
queues map[uuid.UUID]*logQueue
logger slog.Logger
}

type logDest interface {
BatchCreateLogs(ctx context.Context, request *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error)
}

func newLogSender(logger slog.Logger) *logSender {
return &logSender{
Cond: sync.NewCond(&sync.Mutex{}),
logger: logger,
queues: make(map[uuid.UUID]*logQueue),
}
}

func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
logger := l.logger.With(slog.F("log_source_id", src))
if len(logs) == 0 {
logger.Debug(context.Background(), "enqueue called with no logs")
return nil
}
l.L.Lock()
defer l.L.Unlock()
defer l.Broadcast()
q, ok := l.queues[src]
if !ok {
q = &logQueue{}
l.queues[src] = q
}
for _, log := range logs {
pl, err := agentsdk.ProtoFromLog(log)
if err != nil {
return xerrors.Errorf("failed to convert log: %w", err)
}
q.logs = append(q.logs, pl)
}
logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs)))
return nil
}

func (l *logSender) flush(src uuid.UUID) error {
l.L.Lock()
defer l.L.Unlock()
defer l.Broadcast()
q, ok := l.queues[src]
if ok {
q.flushRequested = true
}
// queue might not exist because it's already been flushed and removed from
// the map.
return nil
}

// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not
// retry as it is expected that a higher layer retries establishing connection to the agent API and
// calls sendLoop again.
func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take into consideration what context is passed here when this code is used. On shutdown the context will likely be cancelled but we still want to try to send all the logs.

ctxDone := false
defer l.logger.Debug(ctx, "sendLoop exiting")

// wake 4 times per flush interval to check if anything needs to be flushed
go func() {
tkr := time.NewTicker(flushInterval / 4)
defer tkr.Stop()
for {
select {
// also monitor the context here, so we notice immediately, rather
// than waiting for the next tick or logs
case <-ctx.Done():
l.L.Lock()
ctxDone = true
l.L.Unlock()
l.Broadcast()
return
case <-tkr.C:
l.Broadcast()
}
}
}()

l.L.Lock()
defer l.L.Unlock()
for {
for !ctxDone && !l.hasPendingWorkLocked() {
l.Wait()
}
if ctxDone {
return nil
}
src, q := l.getPendingWorkLocked()
q.flushRequested = false // clear flag since we're now flushing
req := &proto.BatchCreateLogsRequest{
LogSourceId: src[:],
Logs: q.logs[:],
}

l.L.Unlock()
l.logger.Debug(ctx, "sending logs to agent API", slog.F("log_source_id", src), slog.F("num_logs", len(req.Logs)))
_, err := dest.BatchCreateLogs(ctx, req)
l.L.Lock()
if err != nil {
return xerrors.Errorf("failed to upload logs: %w", err)
}

// Since elsewhere we only append to the logs, here we can remove them
// since we successfully sent them. First we nil the pointers though,
// so that they can be gc'd.
for i := 0; i < len(req.Logs); i++ {
q.logs[i] = nil
}
q.logs = q.logs[len(req.Logs):]
if len(q.logs) == 0 {
// no empty queues
delete(l.queues, src)
continue
}
q.lastFlush = time.Now()
}
}

func (l *logSender) hasPendingWorkLocked() bool {
for _, q := range l.queues {
if time.Since(q.lastFlush) > flushInterval {
return true
}
if q.flushRequested {
return true
}
}
return false
}

func (l *logSender) getPendingWorkLocked() (src uuid.UUID, q *logQueue) {
// take the one it's been the longest since we've flushed, so that we have some sense of
// fairness across sources
var earliestFlush time.Time
for is, iq := range l.queues {
if q == nil || iq.lastFlush.Before(earliestFlush) {
src = is
q = iq
earliestFlush = iq.lastFlush
}
}
return src, q
}
141 changes: 141 additions & 0 deletions agent/logs_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package agent

import (
"context"
"testing"
"time"

"golang.org/x/exp/slices"

"github.com/google/uuid"
"github.com/stretchr/testify/require"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/testutil"
)

func TestLogSender(t *testing.T) {
t.Parallel()
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
fDest := newFakeLogDest()
uut := newLogSender(logger)

t0 := dbtime.Now()

ls1 := uuid.UUID{0x11}
err := uut.enqueue(ls1, agentsdk.Log{
CreatedAt: t0,
Output: "test log 0, src 1",
Level: codersdk.LogLevelInfo,
})
require.NoError(t, err)

ls2 := uuid.UUID{0x22}
err = uut.enqueue(ls2,
agentsdk.Log{
CreatedAt: t0,
Output: "test log 0, src 2",
Level: codersdk.LogLevelError,
},
agentsdk.Log{
CreatedAt: t0,
Output: "test log 1, src 2",
Level: codersdk.LogLevelWarn,
},
)
require.NoError(t, err)

loopErr := make(chan error, 1)
go func() {
err := uut.sendLoop(ctx, fDest)
loopErr <- err
}()

// since neither source has even been flushed, it should immediately flush
// both, although the order is not controlled
var logReqs []*proto.BatchCreateLogsRequest
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
for _, req := range logReqs {
require.NotNil(t, req)
srcID, err := uuid.FromBytes(req.LogSourceId)
require.NoError(t, err)
switch srcID {
case ls1:
require.Len(t, req.Logs, 1)
require.Equal(t, "test log 0, src 1", req.Logs[0].GetOutput())
require.Equal(t, proto.Log_INFO, req.Logs[0].GetLevel())
require.Equal(t, t0, req.Logs[0].GetCreatedAt().AsTime())
case ls2:
require.Len(t, req.Logs, 2)
require.Equal(t, "test log 0, src 2", req.Logs[0].GetOutput())
require.Equal(t, proto.Log_ERROR, req.Logs[0].GetLevel())
require.Equal(t, t0, req.Logs[0].GetCreatedAt().AsTime())
require.Equal(t, "test log 1, src 2", req.Logs[1].GetOutput())
require.Equal(t, proto.Log_WARN, req.Logs[1].GetLevel())
require.Equal(t, t0, req.Logs[1].GetCreatedAt().AsTime())
default:
t.Fatal("unknown log source")
}
}

t1 := dbtime.Now()
err = uut.enqueue(ls1, agentsdk.Log{
CreatedAt: t1,
Output: "test log 1, src 1",
Level: codersdk.LogLevelDebug,
})
require.NoError(t, err)
err = uut.flush(ls1)
require.NoError(t, err)

req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
// give ourselves a 25% buffer if we're right on the cusp of a tick
require.LessOrEqual(t, time. A92E Since(t1), flushInterval*5/4)
require.NotNil(t, req)
require.Len(t, req.Logs, 1)
require.Equal(t, "test log 1, src 1", req.Logs[0].GetOutput())
require.Equal(t, proto.Log_DEBUG, req.Logs[0].GetLevel())
require.Equal(t, t1, req.Logs[0].GetCreatedAt().AsTime())

cancel()
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
require.NoError(t, err)

// we can still enqueue more logs after sendLoop returns
err = uut.enqueue(ls1, agentsdk.Log{
CreatedAt: t1,
Output: "test log 2, src 1",
Level: codersdk.LogLevelTrace,
})
require.NoError(t, err)
}

type fakeLogDest struct {
reqs chan *proto.BatchCreateLogsRequest
}

func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) {
// clone the logs so that modifications the sender makes don't affect our tests. In production
// these would be serialized/deserialized so we don't have to worry too much.
req.Logs = slices.Clone(req.Logs)
select {
case <-ctx.Done():
return nil, ctx.Err()
case f.reqs <- req:
return &proto.BatchCreateLogsResponse{}, nil
}
}

func newFakeLogDest() *fakeLogDest {
return &fakeLogDest{
reqs: make(chan *proto.BatchCreateLogsRequest),
}
}
13 changes: 13 additions & 0 deletions codersdk/agentsdk/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/google/uuid"
"golang.org/x/xerrors"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/codersdk"
Expand Down Expand Up @@ -298,3 +299,15 @@ func ProtoFromAppHealthsRequest(req PostAppHealthsRequest) (*proto.BatchUpdateAp
}
return pReq, nil
}

func ProtoFromLog(log Log) (*proto.Log, error) {
lvl, ok := proto.Log_Level_value[strings.ToUpper(string(log.Level))]
if !ok {
return nil, xerrors.Errorf("unknown log level: %s", log.Level)
}
return &proto.Log{
CreatedAt: timestamppb.New(log.CreatedAt),
Output: log.Output,
Level: proto.Log_Level(lvl),
}, nil
}
0