-
Notifications
You must be signed in to change notification settings - Fork 943
feat: add logSender for sending logs on agent v2 API #12046
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
ce5ed91
feat: add logSender for sending logs on agent v2 API
spikecurtis 92abdf6
feat: handle log limit exceeded in logSender
spikecurtis 0aee68c
feat: ensure that log batches don't exceed 1MiB in logSender
spikecurtis f25da2c
feat: limit queued logs to database limit in agent
spikecurtis File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next
Next commit
feat: add logSender for sending logs on agent v2 API
- Loading branch information
commit ce5ed91c76d7337d29ee2f8645251e2fff6d1922
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
package agent | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"time" | ||
|
||
"github.com/google/uuid" | ||
"golang.org/x/xerrors" | ||
|
||
"cdr.dev/slog" | ||
"github.com/coder/coder/v2/agent/proto" | ||
"github.com/coder/coder/v2/codersdk/agentsdk" | ||
) | ||
|
||
const flushInterval = time.Second | ||
|
||
type logQueue struct { | ||
logs []*proto.Log | ||
flushRequested bool | ||
lastFlush time.Time | ||
} | ||
|
||
// logSender is a subcomponent of agent that handles enqueuing logs and then sending them over the | ||
// agent API. Things that need to log call enqueue and flush. When the agent API becomes available, | ||
// the agent calls sendLoop to send pending logs. | ||
type logSender struct { | ||
*sync.Cond | ||
queues map[uuid.UUID]*logQueue | ||
logger slog.Logger | ||
} | ||
|
||
type logDest interface { | ||
BatchCreateLogs(ctx context.Context, request *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) | ||
} | ||
|
||
func newLogSender(logger slog.Logger) *logSender { | ||
return &logSender{ | ||
Cond: sync.NewCond(&sync.Mutex{}), | ||
logger: logger, | ||
queues: make(map[uuid.UUID]*logQueue), | ||
} | ||
} | ||
|
||
func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error { | ||
logger := l.logger.With(slog.F("log_source_id", src)) | ||
if len(logs) == 0 { | ||
logger.Debug(context.Background(), "enqueue called with no logs") | ||
return nil | ||
} | ||
l.L.Lock() | ||
defer l.L.Unlock() | ||
defer l.Broadcast() | ||
q, ok := l.queues[src] | ||
if !ok { | ||
q = &logQueue{} | ||
l.queues[src] = q | ||
} | ||
for _, log := range logs { | ||
pl, err := agentsdk.ProtoFromLog(log) | ||
if err != nil { | ||
return xerrors.Errorf("failed to convert log: %w", err) | ||
} | ||
q.logs = append(q.logs, pl) | ||
} | ||
logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs))) | ||
return nil | ||
} | ||
|
||
func (l *logSender) flush(src uuid.UUID) error { | ||
l.L.Lock() | ||
defer l.L.Unlock() | ||
defer l.Broadcast() | ||
q, ok := l.queues[src] | ||
if ok { | ||
q.flushRequested = true | ||
} | ||
// queue might not exist because it's already been flushed and removed from | ||
// the map. | ||
return nil | ||
} | ||
|
||
// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not | ||
// retry as it is expected that a higher layer retries establishing connection to the agent API and | ||
// calls sendLoop again. | ||
func (l *logSender) sendLoop(ctx context.Context, dest logDest) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please take into consideration what context is passed here when this code is used. On shutdown the context will likely be cancelled but we still want to try to send all the logs. |
||
ctxDone := false | ||
defer l.logger.Debug(ctx, "sendLoop exiting") | ||
|
||
// wake 4 times per flush interval to check if anything needs to be flushed | ||
go func() { | ||
tkr := time.NewTicker(flushInterval / 4) | ||
spikecurtis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
defer tkr.Stop() | ||
for { | ||
select { | ||
// also monitor the context here, so we notice immediately, rather | ||
// than waiting for the next tick or logs | ||
case <-ctx.Done(): | ||
l.L.Lock() | ||
ctxDone = true | ||
l.L.Unlock() | ||
l.Broadcast() | ||
spikecurtis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return | ||
case <-tkr.C: | ||
l.Broadcast() | ||
} | ||
} | ||
}() | ||
|
||
l.L.Lock() | ||
defer l.L.Unlock() | ||
for { | ||
for !ctxDone && !l.hasPendingWorkLocked() { | ||
l.Wait() | ||
} | ||
if ctxDone { | ||
return nil | ||
} | ||
src, q := l.getPendingWorkLocked() | ||
q.flushRequested = false // clear flag since we're now flushing | ||
req := &proto.BatchCreateLogsRequest{ | ||
LogSourceId: src[:], | ||
Logs: q.logs[:], | ||
} | ||
|
||
l.L.Unlock() | ||
l.logger.Debug(ctx, "sending logs to agent API", slog.F("log_source_id", src), slog.F("num_logs", len(req.Logs))) | ||
_, err := dest.BatchCreateLogs(ctx, req) | ||
l.L.Lock() | ||
if err != nil { | ||
return xerrors.Errorf("failed to upload logs: %w", err) | ||
spikecurtis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// Since elsewhere we only append to the logs, here we can remove them | ||
// since we successfully sent them. First we nil the pointers though, | ||
// so that they can be gc'd. | ||
for i := 0; i < len(req.Logs); i++ { | ||
q.logs[i] = nil | ||
} | ||
q.logs = q.logs[len(req.Logs):] | ||
spikecurtis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if len(q.logs) == 0 { | ||
// no empty queues | ||
delete(l.queues, src) | ||
continue | ||
} | ||
q.lastFlush = time.Now() | ||
} | ||
} | ||
|
||
func (l *logSender) hasPendingWorkLocked() bool { | ||
for _, q := range l.queues { | ||
if time.Since(q.lastFlush) > flushInterval { | ||
return true | ||
} | ||
if q.flushRequested { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
func (l *logSender) getPendingWorkLocked() (src uuid.UUID, q *logQueue) { | ||
// take the one it's been the longest since we've flushed, so that we have some sense of | ||
// fairness across sources | ||
var earliestFlush time.Time | ||
for is, iq := range l.queues { | ||
if q == nil || iq.lastFlush.Before(earliestFlush) { | ||
spikecurtis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
src = is | ||
q = iq | ||
earliestFlush = iq.lastFlush | ||
} | ||
} | ||
return src, q | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
package agent | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"golang.org/x/exp/slices" | ||
|
||
"github.com/google/uuid" | ||
"github.com/stretchr/testify/require" | ||
|
||
"cdr.dev/slog" | ||
"cdr.dev/slog/sloggers/slogtest" | ||
"github.com/coder/coder/v2/agent/proto" | ||
"github.com/coder/coder/v2/coderd/database/dbtime" | ||
"github.com/coder/coder/v2/codersdk" | ||
"github.com/coder/coder/v2/codersdk/agentsdk" | ||
"github.com/coder/coder/v2/testutil" | ||
) | ||
|
||
func TestLogSender(t *testing.T) { | ||
t.Parallel() | ||
testCtx := testutil.Context(t, testutil.WaitShort) | ||
ctx, cancel := context.WithCancel(testCtx) | ||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) | ||
fDest := newFakeLogDest() | ||
uut := newLogSender(logger) | ||
|
||
t0 := dbtime.Now() | ||
|
||
ls1 := uuid.UUID{0x11} | ||
err := uut.enqueue(ls1, agentsdk.Log{ | ||
CreatedAt: t0, | ||
Output: "test log 0, src 1", | ||
Level: codersdk.LogLevelInfo, | ||
}) | ||
require.NoError(t, err) | ||
|
||
ls2 := uuid.UUID{0x22} | ||
err = uut.enqueue(ls2, | ||
agentsdk.Log{ | ||
CreatedAt: t0, | ||
Output: "test log 0, src 2", | ||
Level: codersdk.LogLevelError, | ||
}, | ||
agentsdk.Log{ | ||
CreatedAt: t0, | ||
Output: "test log 1, src 2", | ||
Level: codersdk.LogLevelWarn, | ||
}, | ||
) | ||
require.NoError(t, err) | ||
|
||
loopErr := make(chan error, 1) | ||
go func() { | ||
err := uut.sendLoop(ctx, fDest) | ||
loopErr <- err | ||
}() | ||
|
||
// since neither source has even been flushed, it should immediately flush | ||
// both, although the order is not controlled | ||
var logReqs []*proto.BatchCreateLogsRequest | ||
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs)) | ||
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs)) | ||
for _, req := range logReqs { | ||
require.NotNil(t, req) | ||
srcID, err := uuid.FromBytes(req.LogSourceId) | ||
require.NoError(t, err) | ||
switch srcID { | ||
case ls1: | ||
require.Len(t, req.Logs, 1) | ||
require.Equal(t, "test log 0, src 1", req.Logs[0].GetOutput()) | ||
require.Equal(t, proto.Log_INFO, req.Logs[0].GetLevel()) | ||
require.Equal(t, t0, req.Logs[0].GetCreatedAt().AsTime()) | ||
case ls2: | ||
require.Len(t, req.Logs, 2) | ||
require.Equal(t, "test log 0, src 2", req.Logs[0].GetOutput()) | ||
require.Equal(t, proto.Log_ERROR, req.Logs[0].GetLevel()) | ||
require.Equal(t, t0, req.Logs[0].GetCreatedAt().AsTime()) | ||
require.Equal(t, "test log 1, src 2", req.Logs[1].GetOutput()) | ||
require.Equal(t, proto.Log_WARN, req.Logs[1].GetLevel()) | ||
require.Equal(t, t0, req.Logs[1].GetCreatedAt().AsTime()) | ||
default: | ||
t.Fatal("unknown log source") | ||
} | ||
} | ||
|
||
t1 := dbtime.Now() | ||
err = uut.enqueue(ls1, agentsdk.Log{ | ||
CreatedAt: t1, | ||
Output: "test log 1, src 1", | ||
Level: codersdk.LogLevelDebug, | ||
}) | ||
require.NoError(t, err) | ||
err = uut.flush(ls1) | ||
require.NoError(t, err) | ||
|
||
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs) | ||
// give ourselves a 25% buffer if we're right on the cusp of a tick | ||
require.LessOrEqual(t, time. A92E Since(t1), flushInterval*5/4) | ||
require.NotNil(t, req) | ||
require.Len(t, req.Logs, 1) | ||
require.Equal(t, "test log 1, src 1", req.Logs[0].GetOutput()) | ||
require.Equal(t, proto.Log_DEBUG, req.Logs[0].GetLevel()) | ||
require.Equal(t, t1, req.Logs[0].GetCreatedAt().AsTime()) | ||
|
||
cancel() | ||
err = testutil.RequireRecvCtx(testCtx, t, loopErr) | ||
require.NoError(t, err) | ||
|
||
// we can still enqueue more logs after sendLoop returns | ||
err = uut.enqueue(ls1, agentsdk.Log{ | ||
CreatedAt: t1, | ||
Output: "test log 2, src 1", | ||
Level: codersdk.LogLevelTrace, | ||
}) | ||
require.NoError(t, err) | ||
} | ||
|
||
type fakeLogDest struct { | ||
reqs chan *proto.BatchCreateLogsRequest | ||
} | ||
|
||
func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) { | ||
// clone the logs so that modifications the sender makes don't affect our tests. In production | ||
// these would be serialized/deserialized so we don't have to worry too much. | ||
req.Logs = slices.Clone(req.Logs) | ||
select { | ||
case <-ctx.Done(): | ||
return nil, ctx.Err() | ||
case f.reqs <- req: | ||
return &proto.BatchCreateLogsResponse{}, nil | ||
} | ||
} | ||
|
||
func newFakeLogDest() *fakeLogDest { | ||
return &fakeLogDest{ | ||
reqs: make(chan *proto.BatchCreateLogsRequest), | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.