@@ -26,10 +26,40 @@ type timer struct {
26
26
// mu protects reads and writes to all fields, with exceptions noted below.
27
27
mu mutex
28
28
29
- astate atomic.Uint8 // atomic copy of state bits at last unlock
30
- state uint8 // state bits
31
- isChan bool // timer has a channel; immutable; can be read without lock
32
- blocked uint32 // number of goroutines blocked on timer's channel
29
+ astate atomic.Uint8 // atomic copy of state bits at last unlock
30
+ state uint8 // state bits
31
+ isChan bool // timer has a channel; immutable; can be read without lock
32
+
33
+ // isSending is used to handle races between running a
34
+ // channel timer and stopping or resetting the timer.
35
+ // It is used only for channel timers (t.isChan == true).
36
+ // The lowest zero bit is set when about to send a value on the channel,
37
+ // and cleared after sending the value.
38
+ // The stop/reset code uses this to detect whether it
39
+ // stopped the channel send.
40
+ //
41
+ // An isSending bit is set only when t.mu is held.
42
+ // An isSending bit is cleared only when t.sendLock is held.
43
+ // isSending is read only when both t.mu and t.sendLock are held.
44
+ //
10BC0
div>
45
+ // Setting and clearing Uint8 bits handles the case of
46
+ // a timer that is reset concurrently with unlockAndRun.
47
+ // If the reset timer runs immediately, we can wind up with
48
+ // concurrent calls to unlockAndRun for the same timer.
49
+ // Using matched bit set and clear in unlockAndRun
50
+ // ensures that the value doesn't get temporarily out of sync.
51
+ //
52
+ // We use a uint8 to keep the timer struct small.
53
+ // This means that we can only support up to 8 concurrent
54
+ // runs of a timer, where a concurrent run can only occur if
55
+ // we start a run, unlock the timer, the timer is reset to a new
56
+ // value (or the ticker fires again), it is ready to run,
57
+ // and it is actually run, all before the first run completes.
58
+ // Since completing a run is fast, even 2 concurrent timer runs are
59
+ // nearly impossible, so this should be safe in practice.
60
+ isSending atomic.Uint8
61
+
62
+ blocked uint32 // number of goroutines blocked on timer's channel
33
63
34
64
// Timer wakes up at when, and then at when+period, ... (period > 0 only)
35
65
// each time calling f(arg, seq, delay) in the timer goroutine, so f must be
@@ -431,6 +461,15 @@ func (t *timer) stop() bool {
431
461
// Stop any future sends with stale values.
432
462
// See timer.unlockAndRun.
433
463
t .seq ++
464
+
465
+ // If there is currently a send in progress,
466
+ // incrementing seq is going to prevent that
467
+ // send from actually happening. That means
468
+ // that we should return true: the timer was
469
+ // stopped, even though t.when may be zero.
470
+ if t .isSending .Load () > 0 {
471
+ pending = true
472
+ }
434
473
}
435
474
t .unlock ()
436
475
if ! async && t .isChan {
@@ -525,6 +564,15 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in
525
564
// Stop any future sends with stale values.
526
565
// See timer.unlockAndRun.
527
566
t .seq ++
567
+
568
+ // If there is currently a send in progress,
569
+ // incrementing seq is going to prevent that
570
+ // send from actually happening. That means
571
+ // that we should return true: the timer was
572
+ // stopped, even though t.when may be zero.
573
+ if t .isSending .Load () > 0 {
574
+ pending = true
575
+ }
528
576
}
529
577
t .unlock ()
530
578
if ! async && t .isChan {
@@ -1013,6 +1061,24 @@ func (t *timer) unlockAndRun(now int64) {
1013
1061
}
1014
1062
t .updateHeap ()
1015
1063
}
1064
+
1065
+ async := debug .asynctimerchan .Load () != 0
1066
+ var isSendingClear uint8
1067
+ if ! async && t .isChan {
1068
+ // Tell Stop/Reset that we are sending a value.
1069
+ // Set the lowest zero bit.
1070
+ // We do this awkward step because atomic.Uint8
1071
+ // doesn't support Add or CompareAndSwap.
1072
+ // We only set bits with t locked.
1073
+ v := t .isSending .Load ()
1074
+ i := sys .TrailingZeros8 (^ v )
1075
+ if i == 8 {
1076
+ throw ("too many concurrent timer firings" )
1077
+ }
1078
+ isSendingClear = 1 << i
1079
+ t .isSending .Or (isSendingClear )
1080
+ }
1081
+
1016
1082
t .unlock ()
1017
1083
1018
1084
if raceenabled {
@@ -1028,7 +1094,6 @@ func (t *timer) unlockAndRun(now int64) {
1028
1094
ts .unlock ()
1029
1095
}
1030
1096
1031
- async := debug .asynctimerchan .Load () != 0
1032
1097
if ! async && t .isChan {
1033
1098
// For a timer channel, we want to make sure that no stale sends
1034
1099
// happen after a t.stop or t.modify, but we cannot hold t.mu
@@ -1044,6 +1109,10 @@ func (t *timer) unlockAndRun(now int64) {
1044
1109
// and double-check that t.seq is still the seq value we saw above.
1045
1110
// If not, the timer has been updated and we should skip the send.
1046
1111
// We skip the send by reassigning f to a no-op function.
1112
+ //
1113
+ // The isSending field tells t.stop or t.modify that we have
1114
+ // started to send the value. That lets them correctly return
1115
+ // true meaning that no value was sent.
1047
1116
lock (& t .sendLock )
1048
1117
if t .seq != seq {
1049
1118
f = func (any , uintptr , int64 ) {}
@@ -1053,6 +1122,9 @@ func (t *timer) unlockAndRun(now int64) {
1053
1122
f (arg , seq , delay )
1054
1123
1055
1124
if ! async && t .isChan {
1125
+ // We are no longer sending a value.
1126
+ t .isSending .And (^ isSendingClear )
1127
+
1056
1128
unlock (& t .sendLock )
1057
1129
}
1058
1130
0 commit comments