8000 feat: limit queued logs to database limit in agent · coder/coder@f25da2c · GitHub
[go: up one dir, main page]

Skip to content

Commit f25da2c

Browse files
committed
feat: limit queued logs to database limit in agent
1 parent 0aee68c commit f25da2c

File tree

2 files changed

+183
-53
lines changed

2 files changed

+183
-53
lines changed

agent/logs.go

Lines changed: 50 additions & 28 deletions
9E88
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,14 @@ import (
1414
)
1515

1616
const (
17-
flushInterval = time.Second
18-
logOutputMaxBytes = 1 << 20 // 1MiB
19-
overheadPerLog = 21 // found by testing
17+
flushInterval = time.Second
18+
maxBytesPerBatch = 1 << 20 // 1MiB
19+
overheadPerLog = 21 // found by testing
20+
21+
// maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken
22+
// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
23+
// accept in the database.
24+
maxBytesQueued = 1048576
2025
)
2126

2227
type logQueue struct {
@@ -33,6 +38,7 @@ type logSender struct {
3338
queues map[uuid.UUID]*logQueue
3439
logger slog.Logger
3540
exceededLogLimit bool
41+
outputLen int
3642
}
3743

3844
type logDest interface {
@@ -47,37 +53,50 @@ func newLogSender(logger slog.Logger) *logSender {
4753
}
4854
}
4955

50-
func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
56+
func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) {
5157
logger := l.logger.With(slog.F("log_source_id", src))
5258
if len(logs) == 0 {
5359
logger.Debug(context.Background(), "enqueue called with no logs")
54-
return nil
60+
return
5561
}
5662
l.L.Lock()
5763
defer l.L.Unlock()
5864
if l.exceededLogLimit {
5965
logger.Warn(context.Background(), "dropping enqueued logs because we have reached the server limit")
6066
// don't error, as we also write to file and don't want the overall write to fail
61-
return nil
67+
return
6268
}
6369
defer l.Broadcast()
6470
q, ok := l.queues[src]
6571
if !ok {
6672
q = &logQueue{}
6773
l.queues[src] = q
6874
}
69-
for _, log := range logs {
75+
for k, log := range logs {
76+
// Here we check the queue size before adding a log because we want to queue up slightly
77+
// more logs than the database would store to ensure we trigger "logs truncated" at the
78+
// database layer. Otherwise, the end user wouldn't know logs are truncated unless they
79+
// examined the Coder agent logs.
80+
if l.outputLen > maxBytesQueued {
81+
logger.Warn(context.Background(), "log queue full; truncating new logs", slog.F("new_logs", k), slog.F("queued_logs", len(q.logs)))
82+
return
83+
}
7084
pl, err := agentsdk.ProtoFromLog(log)
7185
if err != nil {
72-
return xerrors.Errorf("failed to convert log: %w", err)
86+
logger.Critical(context.Background(), "failed to convert log", slog.Error(err))
87+
return
88+
}
89+
if len(pl.Output)+overheadPerLog > maxBytesPerBatch {
90+
logger.Warn(context.Background(), "dropping log line that exceeds our limit", slog.F("len", len(pl.Output)))
91+
continue
7392
}
7493
q.logs = append(q.logs, pl)
94+
l.outputLen += len(pl.Output)
7595
}
7696
logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs)))
77-
return nil
7897
}
7998

80-
func (l *logSender) flush(src uuid.UUID) error {
99+
func (l *logSender) flush(src uuid.UUID) {
81100
l.L.Lock()
82101
defer l.L.Unlock()
83102
defer l.Broadcast()
@@ -87,13 +106,21 @@ func (l *logSender) flush(src uuid.UUID) error {
87106
}
88107
// queue might not exist because it's already been flushed and removed from
89108
// the map.
90-
return nil
91109
}
92110

93111
// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not
94112
// retry as it is expected that a higher layer retries establishing connection to the agent API and
95113
// calls sendLoop again.
96114
func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
115+
l.L.Lock()
116+
defer l.L.Unlock()
117+
if l.exceededLogLimit {
118+
l.logger.Debug(ctx, "aborting sendLoop because log limit is already exceeded")
119+
// no point in keeping this loop going, if log limit is exceeded, but don't return an
120+
// error because we're already handled it
121+
return nil
122+
}
123+
97124
ctxDone := false
98125
defer l.logger.Debug(ctx, "sendLoop exiting")
99126

@@ -119,42 +146,36 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
119146
}
120147
}()
121148

122-
l.L.Lock()
123-
defer l.L.Unlock()
124149
for {
125-
for !ctxDone && !l.exceededLogLimit && !l.hasPendingWorkLocked() {
150+
for !ctxDone && !l.hasPendingWorkLocked() {
126151
l.Wait()
127152
}
128153
if ctxDone {
129154
return nil
130155
}
131-
if l.exceededLogLimit {
132-
l.logger.Debug(ctx, "aborting sendLoop because log limit is already exceeded")
133-
// no point in keeping this loop going, if log limit is exceeded, but don't return an
134-
// error because we're already handled it
135-
return nil
136-
}
156+
137157
src, q := l.getPendingWorkLocked()
138158
logger := l.logger.With(slog.F("log_source_id", src))
139159
q.flushRequested = false // clear flag since we're now flushing
140160
req := &proto.BatchCreateLogsRequest{
141161
LogSourceId: src[:],
142162
}
143-
o := 0
163+
164+
// outputToSend keeps track of the size of the protobuf message we send, while
165+
// outputToRemove keeps track of the size of the output we'll remove from the queues on
166+
// success. They are different because outputToSend also counts protocol message overheads.
167+
outputToSend := 0
168+
outputToRemove := 0
144169
n := 0
145170
for n < len(q.logs) {
146171
log := q.logs[n]
147-
if len(log.Output) > logOutputMaxBytes {
148-
logger.Warn(ctx, "dropping log line that exceeds our limit")
149-
n++
150-
continue
151-
}
152-
o += len(log.Output) + overheadPerLog
153-
if o > logOutputMaxBytes {
172+
outputToSend += len(log.Output) + overheadPerLog
173+
if outputToSend > maxBytesPerBatch {
154174
break
155175
}
156176
req.Logs = append(req.Logs, log)
157177
n++
178+
outputToRemove += len(log.Output)
158179
}
159180

160181
l.L.Unlock()
@@ -181,6 +202,7 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
181202
q.logs[i] = nil
182203
}
183204
q.logs = q.logs[n:]
205+
l.outputLen -= outputToRemove
184206
if len(q.logs) == 0 {
185207
// no empty queues
186208
delete(l.queues, src)

0 commit comments

Comments
 (0)
0