10BC0 Calls to pipeline produce do not exit early and accidentally corrupt … · open-telemetry/opentelemetry-go@93aa664 · GitHub
[go: up one dir, main page]

Skip to content

Commit 93aa664

Browse files
committed
Calls to pipeline produce do not exit early and accidentally corrupt aggregation state
1 parent 0150494 commit 93aa664

File tree

2 files changed

+135
-13
lines changed

2 files changed

+135
-13
lines changed

sdk/metric/pipeline.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,14 @@ func (p *pipeline) addMultiCallback(c multiCallback) (unregister func()) {
121121
//
122122
// This method is safe to call concurrently.
123123
func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) error {
124+
// Only check if context is already cancelled before starting, not inside or after callback loops.
125+
// If this method returns after executing some callbacks but before running all aggregations,
126+
// internal aggregation state can be corrupted and result in incorrect data returned
127+
// by future produce calls.
128+
if err := ctx.Err(); err != nil {
129+
return err
130+
}
131+
124132
p.Lock()
125133
defer p.Unlock()
126134

@@ -130,26 +138,13 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
130138
if e := c(ctx); e != nil {
131139
err = errors.Join(err, e)
132140
}
133-
if err := ctx.Err(); err != nil {
134-
rm.Resource = nil
135-
clear(rm.ScopeMetrics) // Erase elements to let GC collect objects.
136-
rm.ScopeMetrics = rm.ScopeMetrics[:0]
137-
return err
138-
}
139141
}
140142
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
141143
// TODO make the callbacks parallel. ( #3034 )
142144
f := e.Value.(multiCallback)
143145
if e := f(ctx); e != nil {
144146
err = errors.Join(err, e)
145147
}
146-
if err := ctx.Err(); err != nil {
147-
// This means the context expired before we finished running callbacks.
148-
rm.Resource = nil
149-
clear(rm.ScopeMetrics) // Erase elements to let GC collect objects.
150-
rm.ScopeMetrics = rm.ScopeMetrics[:0]
151-
return err
152-
}
153148
}
154149

155150
rm.Resource = p.resource

sdk/metric/pipeline_test.go

Lines changed: 127 additions & 0 deletions
AE7F
Original file line numberDiff line numberDiff line change
@@ -613,3 +613,130 @@ func TestPipelineWithMultipleReaders(t *testing.T) {
613613
assert.Equal(t, int64(2), rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value)
614614
}
615615
}
616+
617+
func TestPipelineProduceErrors(t *testing.T) {
618+
// Create a test pipeline with aggregations
619+
pipeReader := NewManualReader()
620+
pipe := newPipeline(nil, pipeReader, nil, exemplar.AlwaysOffFilter)
621+
622+
// Set up an observable with callbacks
623+
var testObsID observableID[int64]
624+
aggBuilder := aggregate.Builder[int64]{Temporality: metricdata.CumulativeTemporality}
625+
measure, _ := aggBuilder.Sum(true)
626+
pipe.addInt64Measure(testObsID, []aggregate.Measure[int64]{measure})
627+
628+
// Add an aggregation that just sets the data point value to the number of times the aggregation is invoked
629+
aggCallCount := 0
630+
inst := instrumentSync{
631+
name: "test-metric",
632+
description: "test description",
633+
unit: "test unit",
634+
compAgg: func(dest *metricdata.Aggregation) int {
635+
aggCallCount++
636+
637+
*dest = metricdata.Sum[int64]{
638+
Temporality: metricdata.CumulativeTemporality,
639+
IsMonotonic: false,
640+
DataPoints: []metricdata.DataPoint[int64]{{Value: int64(aggCallCount)}},
641+
}
642+
return aggCallCount
643+
},
644+
}
645+
pipe.addSync(instrumentation.Scope{Name: "test"}, inst)
646+
647+
ctx, cancelCtx := context.WithCancel(context.Background())
648+
var shouldCancelContext bool // When true, the second callback cancels ctx
649+
var shouldReturnError bool // When true, the third callback returns an error
650+
var callbackCounts [3]int
651+
652+
// Callback 1: cancels the context during execution but continues to populate data
653+
pipe.callbacks = append(pipe.callbacks, func(ctx context.Context) error {
654+
callbackCounts[0]++
655+
for _, m := range pipe.int64Measures[testObsID] {
656+
m(ctx, 123, *attribute.EmptySet())
657+
}
658+
return nil
659+
})
660+
661+
// Callback 2: populates int64 observable data
662+
pipe.callbacks = append(pipe.callbacks, func(ctx context.Context) error {
663+
callbackCounts[1]++
664+
if shouldCancelContext {
665+
cancelCtx()
666+
}
667+
return nil
668+
})
669+
670+
// Callback 3: return an error
671+
pipe.callbacks = append(pipe.callbacks, func(ctx context.Context) error {
672+
callbackCounts[2]++
673+
if shouldReturnError {
674+
return fmt.Errorf("test callback error")
675+
}
676+
return nil
677+
})
678+
679+
assertMetrics := func(rm *metricdata.ResourceMetrics, expectVal int64) {
680+
require.Len(t, rm.ScopeMetrics, 1)
681+
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
682+
metricdatatest.AssertEqual(t, metricdata.Metrics{
683+
Name: inst.name,
684+
Description: inst.description,
685+
Unit: inst.unit,
686+
Data: metricdata.Sum[int64]{
687+
Temporality: metricdata.CumulativeTemporality,
688+
IsMonotonic: false,
689+
DataPoints: []metricdata.DataPoint[int64]{{Value: expectVal}},
690+
},
691+
}, rm.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp())
692+
}
693+
694+
t.Run("no errors", func(t *testing.T) {
695+
var rm metricdata.ResourceMetrics
696+
err := pipe.produce(ctx, &rm)
697+
require.NoError(t, err)
698+
699+
assert.Equal(t, [3]int{1, 1, 1}, callbackCounts)
700+
assert.Equal(t, 1, aggCallCount)
701+
702+
assertMetrics(&rm, 1)
703+
})
704+
705+
t.Run("callback returns error", func(t *testing.T) {
706+
shouldReturnError = true
707+
708+
var rm metricdata.ResourceMetrics
709+
err := pipe.produce(ctx, &rm)
710+
require.ErrorContains(t, err, "test callback error")
711+
712+
// Even though a callback returned an error, the agg function is still called
713+
assert.Equal(t, [3]int{2, 2, 2}, callbackCounts)
714+
assert.Equal(t, 2, aggCallCount)
715+
716+
assertMetrics(&rm, 2)
717+
})
718+
719+
t.Run("context canceled during produce", func(t *testing.T) {
720+
shouldCancelContext = true
721+
722+
var rm metricdata.ResourceMetrics
723+
err := pipe.produce(ctx, &rm)
724+
require.ErrorContains(t, err, "test callback error")
725+
726+
// Even though the context was canceled midway through invoking callbacks,
727+
// all remaining callbacks and agg functions are still called
728+
assert.Equal(t, [3]int{3, 3, 3}, callbackCounts)
729+
assert.Equal(t, 3, aggCallCount)
730+
})
731+
732+
t.Run("context already cancelled", func(t *testing.T) {
733+
var output metricdata.ResourceMetrics
734+
err := pipe.produce(ctx, &output)
735+
require.ErrorIs(t, err, context.Canceled)
736+
737+
// No callbacks or agg functions are called since the context was canceled prior to invoking
738+
// the produce method
739+
assert.Equal(t, [3]int{3, 3, 3}, callbackCounts)
740+
assert.Equal(t, 3, aggCallCount)
741+
})
742+
}

0 commit comments

Comments
 (0)
0