2
2
using System . Threading . Channels ;
3
3
using Google . Protobuf ;
4
4
using Google . Protobuf . Collections ;
5
+ using Microsoft . Extensions . Options ;
5
6
using OddDotNet . Proto . Trace . V1 ;
6
7
using OpenTelemetry . Proto . Common . V1 ;
7
8
using OpenTelemetry . Proto . Trace . V1 ;
@@ -14,26 +15,26 @@ public class SpanSignalList : ISignalList<FlatSpan>
14
15
private readonly IChannelManager < FlatSpan > _channels ;
15
16
private readonly TimeProvider _timeProvider ;
16
17
private readonly ILogger < SpanSignalList > _logger ;
18
+ private readonly OddSettings _oddSettings ;
17
19
18
20
private static readonly List < Expirable < FlatSpan > > Spans = [ ] ;
19
21
20
22
private static readonly object Lock = new ( ) ;
21
- public SpanSignalList ( IChannelManager < FlatSpan > channels , TimeProvider timeProvider , ILogger < SpanSignalList > logger )
23
+ public SpanSignalList ( IChannelManager < FlatSpan > channels , TimeProvider timeProvider , ILogger < SpanSignalList > logger , IOptions < OddSettings > oddSettings )
22
24
{
23
25
_channels = channels ;
24
26
_timeProvider = timeProvider ;
25
27
_logger = logger ;
28
+ _oddSettings = oddSettings . Value ;
26
29
}
27
30
28
31
public void Add ( FlatSpan signal )
29
32
{
30
33
lock ( Lock )
31
34
{
32
- PruneExpiredSpans ( ) ;
33
35
34
- // Add the new span with 30 second expire
35
- // TODO make this configurable
36
- DateTimeOffset expiresAt = _timeProvider . GetUtcNow ( ) . AddSeconds ( 30 ) ;
36
+ // Add the new span with configured expiration
37
+ DateTimeOffset expiresAt = _timeProvider . GetUtcNow ( ) . AddMilliseconds ( _oddSettings . Cache . Expiration ) ;
37
38
Spans . Add ( new Expirable < FlatSpan > ( signal , expiresAt ) ) ;
38
39
39
40
// Notify any listening channels
@@ -54,8 +55,6 @@ public async IAsyncEnumerable<FlatSpan> QueryAsync(IQueryRequest<FlatSpan> reque
54
55
// Create the channel and populate it with the current contents of the span list
55
56
lock ( Lock )
56
57
{
57
- PruneExpiredSpans ( ) ;
58
-
59
58
foreach ( var expirableSpan in Spans )
60
59
{
61
60
channel . Writer . TryWrite ( expirableSpan . Signal ) ;
@@ -67,15 +66,15 @@ public async IAsyncEnumerable<FlatSpan> QueryAsync(IQueryRequest<FlatSpan> reque
67
66
68
67
while ( currentCount < takeCount && ! cts . IsCancellationRequested )
69
68
{
70
- FlatSpan ? span = null ;
69
+ FlatSpan ? span ;
71
70
try
72
71
{
73
72
await channel . Reader . WaitToReadAsync ( cts . Token ) ;
74
73
span = await channel . Reader . ReadAsync ( cts . Token ) ;
75
74
}
76
75
catch ( OperationCanceledException ex )
77
76
{
78
- _logger . LogWarning ( ex , "The query operation was cancelled" ) ;
77
+ _logger . LogDebug ( ex , "The query operation was cancelled" ) ;
79
78
break ;
80
79
}
81
80
@@ -103,6 +102,17 @@ public void Reset(IResetRequest<FlatSpan> request)
103
102
}
104
103
}
105
104
105
+ public void Prune ( )
106
+ {
107
+ _logger . LogDebug ( "Prune" ) ;
108
+ lock ( Lock )
109
+ {
110
+ DateTimeOffset currentTime = _timeProvider . GetUtcNow ( ) ;
111
+ int numRemoved = Spans . RemoveAll ( expirable => expirable . ExpireAt < currentTime ) ;
112
+ _logger . LogDebug ( "Removed {numRemoved} spans" , numRemoved ) ;
113
+ }
114
+ }
115
+
106
116
private static CancellationTokenSource GetQueryTimeout ( SpanQueryRequest spanRequest )
107
117
{
108
118
var defaultTimeout = new CancellationTokenSource ( TimeSpan . FromMilliseconds ( int . MaxValue ) ) ;
@@ -112,11 +122,11 @@ private static CancellationTokenSource GetQueryTimeout(SpanQueryRequest spanRequ
112
122
: new CancellationTokenSource ( TimeSpan . FromMilliseconds ( spanRequest . Duration . Milliseconds ) ) ;
113
123
}
114
124
115
- private void PruneExpiredSpans ( )
116
- {
117
- DateTimeOffset currentTime = _timeProvider . GetUtcNow ( ) ;
118
- Spans . RemoveAll ( expirable => expirable . ExpireAt < currentTime ) ;
119
- }
125
+ // private void PruneExpiredSpans()
126
+ // {
127
+ // DateTimeOffset currentTime = _timeProvider.GetUtcNow();
128
+ // Spans.RemoveAll(expirable => expirable.ExpireAt < currentTime);
129
+ // }
120
130
121
131
private static int GetTakeCount ( SpanQueryRequest spanQueryRequest ) => spanQueryRequest . Take . ValueCase switch
122
132
{
0 commit comments