From 60046f8904a0e3e4818408990192b2817e12dbc5 Mon Sep 17 00:00:00 2001 From: Donal Hurley Date: Fri, 19 Sep 2025 13:29:53 +0100 Subject: [PATCH 1/2] Fix how we handle log tailing when the line is nil (#1283) --- internal/datasource/nginx/log_tailer.go | 89 ++++++++++++++----------- 1 file changed, 49 insertions(+), 40 deletions(-) diff --git a/internal/datasource/nginx/log_tailer.go b/internal/datasource/nginx/log_tailer.go index 1b7d73bbf..8c0a28e1a 100644 --- a/internal/datasource/nginx/log_tailer.go +++ b/internal/datasource/nginx/log_tailer.go @@ -111,73 +111,95 @@ func NewLTSVTailer(file string) (*LTSVTailer, error) { return <SVTailer{t}, nil } +//nolint:revive // Can't extract anymore functions to reduce complexity func (t *Tailer) Tail(ctx context.Context, data chan<- string) { for { select { case line := <-t.handle.Lines: - lineContent := t.parseLine(line) + if line == nil { + return + } + + if line.Err != nil { + continue + } + + lineContent := line.Text if lineContent != "" { data <- lineContent } case <-ctx.Done(): handleContextDone(ctx) + err := t.handle.Stop() + if err != nil { + slog.ErrorContext(ctx, "Error stopping tailer", "error", err) + } + return } } } -func (t *Tailer) parseLine(line *tail.Line) string { - if line == nil { - return "" - } - - if line.Err != nil { - return "" - } - - return line.Text -} - +//nolint:revive // Can't extract anymore functions to reduce complexity func (t *PatternTailer) Tail(ctx context.Context, data chan<- map[string]string) { for { select { case line := <-t.handle.Lines: - lineContent := t.parseLine(line) + if line == nil { + return + } + + if line.Err != nil { + continue + } + + lineContent := t.gc.ParseString(line.Text) if lineContent != nil { data <- lineContent } case <-ctx.Done(): handleContextDone(ctx) - return - } - } -} + err := t.handle.Stop() + if err != nil { + slog.ErrorContext(ctx, "Error stopping tailer", "error", err) + } -func (t *PatternTailer) parseLine(line *tail.Line) map[string]string { - if line == nil { - return nil - } + slog.DebugContext(ctx, "Tailer is done") - if line.Err != nil { - return nil + return + } } - - return t.gc.ParseString(line.Text) } +//nolint:revive // Can't extract anymore functions to reduce complexity func (t *LTSVTailer) Tail(ctx context.Context, data chan<- map[string]string) { for { select { case line := <-t.handle.Lines: - lineText := t.parseLine(line) + if line == nil { + return + } + + if line.Err != nil { + continue + } + + lineText := t.parse(line.Text) if lineText != nil { data <- lineText } case <-ctx.Done(): handleContextDone(ctx) + err := t.handle.Stop() + if err != nil { + slog.ErrorContext(ctx, "Error stopping tailer", "error", err) + } + + slog.DebugContext(ctx, "Tailer is done") + return } } @@ -199,18 +221,6 @@ func (t *LTSVTailer) parse(line string) map[string]string { return lineMap } -func (t *LTSVTailer) parseLine(line *tail.Line) map[string]string { - if line == nil { - return nil - } - - if line.Err != nil { - return nil - } - - return t.parse(line.Text) -} - func handleContextDone(ctx context.Context) { ctxErr := ctx.Err() switch ctxErr { @@ -219,5 +229,4 @@ func handleContextDone(ctx context.Context) { case context.Canceled: slog.DebugContext(ctx, "Tailer forcibly canceled", "error", ctxErr) } - slog.DebugContext(ctx, "Tailer is done") } From 1fa281f1f134a59959aa3872ba5f3a668dc1454c Mon Sep 17 00:00:00 2001 From: Sean Breen Date: Tue, 23 Sep 2025 15:14:50 +0100 Subject: [PATCH 2/2] fix concurrent access to logs --- .../collector/logsgzipprocessor/processor_benchmark_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/collector/logsgzipprocessor/processor_benchmark_test.go b/internal/collector/logsgzipprocessor/processor_benchmark_test.go index b99baf5bb..505b49b09 100644 --- a/internal/collector/logsgzipprocessor/processor_benchmark_test.go +++ b/internal/collector/logsgzipprocessor/processor_benchmark_test.go @@ -83,7 +83,9 @@ func BenchmarkGzipProcessor_Concurrent(b *testing.B) { b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - _ = p.ConsumeLogs(context.Background(), logs) + logsCopy := plog.NewLogs() + logs.CopyTo(logsCopy) + _ = p.ConsumeLogs(context.Background(), logsCopy) } }) }