8000 feat: ensure that log batches don't exceed 1MiB in logSender · coder/coder@743f5f3 · GitHub
[go: up one dir, main page]

Skip to content
8000

Commit 743f5f3

Browse files
committed
feat: ensure that log batches don't exceed 1MiB in logSender
1 parent 0515169 commit 743f5f3

File tree

2 files changed

+122
-5
lines changed

2 files changed

+122
-5
lines changed

agent/logs.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ import (
1313
"github.com/coder/coder/v2/codersdk/agentsdk"
1414
)
1515

16-
const flushInterval = time.Second
16+
const (
17+
flushInterval = time.Second
18+
logOutputMaxBytes = 1 << 20 // 1MiB
19+
overheadPerLog = 21 // found by testing
20+
)
1721

1822
type logQueue struct {
1923
logs []*proto.Log
@@ -131,14 +135,30 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
131135
return nil
132136
}
133137
src, q := l.getPendingWorkLocked()
138+
logger := l.logger.With(slog.F("log_source_id", src))
134139
q.flushRequested = false // clear flag since we're now flushing
135140
req := &proto.BatchCreateLogsRequest{
136141
LogSourceId: src[:],
137-
Logs: q.logs[:],
142+
}
143+
o := 0
144+
n := 0
145+
for n < len(q.logs) {
146+
log := q.logs[n]
147+
if len(log.Output) > logOutputMaxBytes {
148+
logger.Warn(ctx, "dropping log line that exceeds our limit")
149+
n++
150+
continue
151+
}
152+
o += len(log.Output) + overheadPerLog
153+
if o > logOutputMaxBytes {
154+
break
155+
}
156+
req.Logs = append(req.Logs, log)
157+
n++
138158
}
139159

140160
l.L.Unlock()
141-
l.logger.Debug(ctx, "sending logs to agent API", slog.F("log_source_id", src), slog.F("num_logs", len(req.Logs)))
161+
logger.Debug(ctx, "sending logs to agent API", slog.F("num_logs", len(req.Logs)))
142162
resp, err := dest.BatchCreateLogs(ctx, req)
143163
l.L.Lock()
144164
if err != nil {
@@ -157,10 +177,10 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
157177
// Since elsewhere we only append to the logs, here we can remove them
158178
// since we successfully sent them. First we nil the pointers though,
159179
// so that they can be gc'd.
160-
for i := 0; i < len(req.Logs); i++ {
180+
for i := 0; i < n; i++ {
161181
q.logs[i] = nil
162182
}
163-
q.logs = q.logs[len(req.Logs):]
183+
q.logs = q.logs[n:]
164184
if len(q.logs) == 0 {
165185
// no empty queues
166186
delete(l.queues, src)

agent/logs_internal_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/google/uuid"
1111
"github.com/stretchr/testify/require"
12+
protobuf "google.golang.org/protobuf/proto"
1213

1314
"cdr.dev/slog"
1415
"cdr.dev/slog/sloggers/slogtest"
@@ -165,6 +166,102 @@ func TestLogSender_LogLimitExceeded(t *testing.T) {
165166
require.Len(t, uut.queues, 0)
166167
}
167168

169+
func TestLogSender_SkipHugeLog(t *testing.T) {
170+
t.Parallel()
171+
testCtx := testutil.Context(t, testutil.WaitShort)
172+
ctx, cancel := context.WithCancel(testCtx)
173+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
174+
fDest := newFakeLogDest()
175+
uut := newLogSender(logger)
176+
177+
t0 := dbtime.Now()
178+
ls1 := uuid.UUID{0x11}
179+
hugeLog := make([]byte, logOutputMaxBytes+1)
180+
for i := range hugeLog {
181+
hugeLog[i] = 'q'
182+
}
183+
err := uut.enqueue(ls1,
184+
agentsdk.Log{
185+
CreatedAt: t0,
186+
Output: string(hugeLog),
187+
Level: codersdk.LogLevelInfo,
188+
},
189+
agentsdk.Log{
190+
CreatedAt: t0,
191+
Output: "test log 1, src 1",
192+
Level: codersdk.LogLevelInfo,
193+
})
194+
require.NoError(t, err)
195+
196+
loopErr := make(chan error, 1)
197+
go func() {
198+
err := uut.sendLoop(ctx, fDest)
199+
loopErr <- err
200+
}()
201+
202+
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
203+
require.NotNil(t, req)
204+
require.Len(t, req.Logs, 1, "it should skip the huge log")
205+
require.Equal(t, "test log 1, src 1", req.Logs[0].GetOutput())
206+
require.Equal(t, proto.Log_INFO, req.Logs[0].GetLevel())
207+
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
208+
209+
cancel()
210+
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
211+
require.NoError(t, err)
212+
}
213+
214+
func TestLogSender_Batch(t *testing.T) {
215+
t.Parallel()
216+
testCtx := testutil.Context(t, testutil.WaitShort)
217+
ctx, cancel := context.WithCancel(testCtx)
218+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
219+
fDest := newFakeLogDest()
220+
uut := newLogSender(logger)
221+
222+
t0 := dbtime.Now()
223+
ls1 := uuid.UUID{0x11}
224+
var logs []agentsdk.Log
225+
for i := 0; i < 60000; i++ {
226+
logs = append(logs, agentsdk.Log{
227+
CreatedAt: t0,
228+
Output: "r",
229+
Level: codersdk.LogLevelInfo,
230+
})
231+
}
232+
err := uut.enqueue(ls1, logs...)
233+
require.NoError(t, err)
234+
235+
loopErr := make(chan error, 1)
236+
go func() {
237+
err := uut.sendLoop(ctx, fDest)
238+
loopErr <- err
239+
}()
240+
241+
// with 60k logs, we should split into two updates to avoid going over 1MiB, since each log
242+
// is about 21 bytes.
243+
gotLogs := 0
244+
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
245+
require.NotNil(t, req)
246+
gotLogs += len(req.Logs)
247+
wire, err := protobuf.Marshal(req)
248+
require.NoError(t, err)
249+
require.Less(t, len(wire), logOutputMaxBytes, "wire should not exceed 1MiB")
250+
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
251+
req = testutil.RequireRecvCtx(ctx, t, fDest.reqs)
252+
require.NotNil(t, req)
253+
gotLogs += len(req.Logs)
254+
wire, err = protobuf.Marshal(req)
255+
require.NoError(t, err)
256+
require.Less(t, len(wire), logOutputMaxBytes, "wire should not exceed 1MiB")
257+
require.Equal(t, 60000, gotLogs)
258+
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
259+
260+
cancel()
261+
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
262+
require.NoError(t, err)
263+
}
264+
168265
type fakeLogDest struct {
169266
reqs chan *proto.BatchCreateLogsRequest
170267
resps chan *proto.BatchCreateLogsResponse

0 commit comments

Comments
 (0)
0