@@ -613,3 +613,130 @@ func TestPipelineWithMultipleReaders(t *testing.T) {
613
613
assert .Equal (t , int64 (2 ), rm .ScopeMetrics [0 ].Metrics [0 ].Data .(metricdata.Sum [int64 ]).DataPoints [0 ].Value )
614
614
}
615
615
}
616
+
617
+ func TestPipelineProduceErrors (t * testing.T ) {
618
+ // Create a test pipeline with aggregations
619
+ pipeReader := NewManualReader ()
620
+ pipe := newPipeline (nil , pipeReader , nil , exemplar .AlwaysOffFilter )
621
+
622
+ // Set up an observable with callbacks
623
+ var testObsID observableID [int64 ]
624
+ aggBuilder := aggregate.Builder [int64 ]{Temporality : metricdata .CumulativeTemporality }
625
+ measure , _ := aggBuilder .Sum (true )
626
+ pipe .addInt64Measure (testObsID , []aggregate.Measure [int64 ]{measure })
627
+
628
+ // Add an aggregation that just sets the data point value to the number of times the aggregation is invoked
629
+ aggCallCount := 0
630
+ inst := instrumentSync {
631
+ name : "test-metric" ,
632
+ description : "test description" ,
633
+ unit : "test unit" ,
634
+ compAgg : func (dest * metricdata.Aggregation ) int {
635
+ aggCallCount ++
636
+
637
+ * dest = metricdata.Sum [int64 ]{
638
+ Temporality : metricdata .CumulativeTemporality ,
639
+ IsMonotonic : false ,
640
+ DataPoints : []metricdata.DataPoint [int64 ]{{Value : int64 (aggCallCount )}},
641
+ }
642
+ return aggCallCount
643
+ },
644
+ }
645
+ pipe .addSync (instrumentation.Scope {Name : "test" }, inst )
646
+
647
+ ctx , cancelCtx := context .WithCancel (context .Background ())
648
+ var shouldCancelContext bool // When true, the second callback cancels ctx
649
+ var shouldReturnError bool // When true, the third callback returns an error
650
+ var callbackCounts [3 ]int
651
+
652
+ // Callback 1: cancels the context during execution but continues to populate data
653
+ pipe .callbacks = append (pipe .callbacks , func (ctx context.Context ) error {
654
+ callbackCounts [0 ]++
655
+ for _ , m := range pipe .int64Measures [testObsID ] {
656
+ m (ctx , 123 , * attribute .EmptySet ())
657
+ }
658
+ return nil
659
+ })
660
+
661
+ // Callback 2: populates int64 observable data
662
+ pipe .callbacks = append (pipe .callbacks , func (ctx context.Context ) error {
663
+ callbackCounts [1 ]++
664
+ if shouldCancelContext {
665
+ cancelCtx ()
666
+ }
667
+ return nil
668
+ })
669
+
670
+ // Callback 3: return an error
671
+ pipe .callbacks = append (pipe .callbacks , func (ctx context.Context ) error {
672
+ callbackCounts [2 ]++
673
+ if shouldReturnError {
674
+ return fmt .Errorf ("test callback error" )
675
+ }
676
+ return nil
677
+ })
678
+
679
+ assertMetrics := func (rm * metricdata.ResourceMetrics , expectVal int64 ) {
680
+ require .Len (t , rm .ScopeMetrics , 1 )
681
+ require .Len (t , rm .ScopeMetrics [0 ].Metrics , 1 )
682
+ metricdatatest .AssertEqual (t , metricdata.Metrics {
683
+ Name : inst .name ,
684
+ Description : inst .description ,
685
+ Unit : inst .unit ,
686
+ Data : metricdata.Sum [int64 ]{
687
+ Temporality : metricdata .CumulativeTemporality ,
688
+ IsMonotonic : false ,
689
+ DataPoints : []metricdata.DataPoint [int64 ]{{Value : expectVal }},
690
+ },
691
+ }, rm .ScopeMetrics [0 ].Metrics [0 ], metricdatatest .IgnoreTimestamp ())
692
+ }
693
+
694
+ t .Run ("no errors" , func (t * testing.T ) {
695
+ var rm metricdata.ResourceMetrics
696
+ err := pipe .produce (ctx , & rm )
697
+ require .NoError (t , err )
698
+
699
+ assert .Equal (t , [3 ]int {1 , 1 , 1 }, callbackCounts )
700
+ assert .Equal (t , 1 , aggCallCount )
701
+
702
+ assertMetrics (& rm , 1 )
703
+ })
704
+
705
+ t .Run ("callback returns error" , func (t * testing.T ) {
706
+ shouldReturnError = true
707
+
708
+ var rm metricdata.ResourceMetrics
709
+ err := pipe .produce (ctx , & rm )
710
+ require .ErrorContains (t , err , "test callback error" )
711
+
712
+ // Even though a callback returned an error, the agg function is still called
713
+ assert .Equal (t , [3 ]int {2 , 2 , 2 }, callbackCounts )
714
+ assert .Equal (t , 2 , aggCallCount )
715
+
716
+ assertMetrics (& rm , 2 )
717
+ })
718
+
719
+ t .Run ("context canceled during produce" , func (t * testing.T ) {
720
+ shouldCancelContext = true
721
+
AE7F
722
+ var rm metricdata.ResourceMetrics
723
+ err := pipe .produce (ctx , & rm )
724
+ require .ErrorContains (t , err , "test callback error" )
725
+
726
+ // Even though the context was canceled midway through invoking callbacks,
727
+ // all remaining callbacks and agg functions are still called
728
+ assert .Equal (t , [3 ]int {3 , 3 , 3 }, callbackCounts )
729
+ assert .Equal (t , 3 , aggCallCount )
730
+ })
731
+
732
+ t .Run ("context already cancelled" , func (t * testing.T ) {
733
+ var output metricdata.ResourceMetrics
734
+ err := pipe .produce (ctx , & output )
735
+ require .ErrorIs (t , err , context .Canceled )
736
+
737
+ // No callbacks or agg functions are called since the context was canceled prior to invoking
738
+ // the produce method
739
+ assert .Equal (t , [3 ]int {3 , 3 , 3 }, callbackCounts )
740
+ assert .Equal (t , 3 , aggCallCount )
741
+ })
742
+ }
0 commit comments