24
24
import java .util .concurrent .atomic .AtomicBoolean ;
25
25
import java .util .concurrent .atomic .AtomicInteger ;
26
26
import java .util .concurrent .atomic .AtomicLong ;
27
+ import java .util .concurrent .atomic .AtomicReference ;
27
28
import java .util .function .Function ;
29
+ import java .util .function .Supplier ;
28
30
import java .util .stream .Collectors ;
29
31
30
32
import org .assertj .core .api .Assertions ;
39
41
import reactor .core .scheduler .Scheduler ;
40
42
import reactor .core .scheduler .Schedulers ;
41
43
import reactor .test .StepVerifier ;
44
+ import reactor .test .publisher .PublisherProbe ;
42
45
import reactor .test .scheduler .VirtualTimeScheduler ;
43
46
import reactor .test .subscriber .AssertSubscriber ;
44
47
import reactor .util .context .Context ;
45
48
import reactor .util .context .ContextView ;
46
49
import reactor .util .function .Tuple2 ;
47
50
import reactor .util .retry .Retry ;
48
51
import reactor .util .retry .RetryBackoffSpec ;
52
+ import reactor .util .retry .RetrySpec ;
49
53
50
54
import static org .assertj .core .api .Assertions .assertThat ;
51
55
import static org .assertj .core .api .Assertions .assertThatExceptionOfType ;
56
+ import static org .mockito .BDDMockito .given ;
52
57
53
58
public class FluxRetryWhenTest {
54
59
@@ -58,6 +63,45 @@ public class FluxRetryWhenTest {
58
63
Flux <Integer > rangeError = Flux .concat (Flux .range (1 , 2 ),
59
64
Flux .error (new RuntimeException ("forced failure 0" )));
60
65
66
+ @ Test
67
+ // https://github.com/reactor/reactor-core/issues/3314
68
+ void ensuresContextIsRestoredInRetryFunctions () {
69
+ PublisherProbe <Void > doBeforeRetryProbe = PublisherProbe .empty ();
70
+ AtomicReference <ContextView > capturedContext = new AtomicReference <>();
71
+
72
+ RetrySpec spec = Retry .max (1 )
73
+ .doBeforeRetryAsync (
74
+ retrySignal ->
75
+ Mono .deferContextual (cv -> {
76
+ capturedContext .set (cv );
77
+ return doBeforeRetryProbe .mono ();
78
+ })
79
+ );
80
+
81
+ Context context = Context .of ("test" , "test" );
82
+
83
+ Mono .defer (new Supplier <Mono <?>>() {
84
+ int index = 0 ;
85
+
86
+ @ Override
87
+ public Mono <?> get () {
88
+ if (index ++ == 0 ) {
89
+ return Mono .error (new RuntimeException ());
90
+ } else {
91
+ return Mono .just ("someValue" );
92
+ }
93
+ }
94
+ })
95
+ .retryWhen (spec )
96
+ .contextWrite (context )
97
+ .as (StepVerifier ::create )
98
+ .expectNext ("someValue" )
99
+ .verifyComplete ();
100
+
101
+ doBeforeRetryProbe .assertWasSubscribed ();
102
+ assertThat (capturedContext ).hasValueMatching (c -> c .hasKey ("test" ));
103
+ }
104
+
61
105
@ Test
62
106
//https://github.com/reactor/reactor-core/issues/3253
63
107
public void shouldFailWhenOnErrorContinueEnabled () {
0 commit comments