8000 Merge #3307 into 3.5.1 · reactor/reactor-core@558173d · GitHub
[go: up one dir, main page]

Skip to content

Commit 558173d

Browse files
author
Oleh Dokuka
committed
Merge #3307 into 3.5.1
Signed-off-by: OlegDokuka <odokuka@vmware.com>
2 parents 69450bb + cfe1979 commit 558173d

File tree

3 files changed

+51
-7
lines changed

3 files changed

+51
-7
lines changed

reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ public Flux<Long> generateCompanion(Flux<RetrySignal> t) {
574574
//short-circuit delay == 0 case
575575
if (nextBackoff.isZero()) {
576576
return RetrySpec.applyHooks(copy, Mono.just(iteration),
577-
syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry);
577+
syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry, cv);
578578
}
579579

580580
ThreadLocalRandom random = ThreadLocalRandom.current();
@@ -602,9 +602,8 @@ public Flux<Long> generateCompanion(Flux<RetrySignal> t) {
602602
jitter = random.nextLong(lowBound, highBound);
603603
}
604604
Duration effectiveBackoff = nextBackoff.plusMillis(jitter);
605-
return RetrySpec.applyHooks(copy, Mono.delay(effectiveBackoff,
606-
backoffSchedulerSupplier.get()),
607-
syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry);
605+
return RetrySpec.applyHooks(copy, Mono.delay(effectiveBackoff, backoffSchedulerSupplier.get()),
606+
syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry, cv);
608607
})
609608
.contextWrite(c -> Context.empty())
610609
);

reactor-core/src/main/java/reactor/util/retry/RetrySpec.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ else if (iteration >= maxAttempts) {
373373
return Mono.error(retryExhaustedGenerator.apply(this, copy));
374374
}
375375
else {
376-
return applyHooks(copy, Mono.just(iteration), doPreRetry, doPostRetry, asyncPreRetry, asyncPostRetry);
376+
return applyHooks(copy, Mono.just(iteration), doPreRetry, doPostRetry, asyncPreRetry, asyncPostRetry, cv);
377377
}
378378
})
379379
.contextWrite(c -> Context.empty())
@@ -389,7 +389,8 @@ static <T> Mono<T> applyHooks(RetrySignal copyOfSignal,
389389
final Consumer<RetrySignal> doPreRetry,
390390
final Consumer<RetrySignal> doPostRetry,
391391
final BiFunction<RetrySignal, Mono<Void>, Mono<Void>> asyncPreRetry,
392-
final BiFunction<RetrySignal, Mono<Void>, Mono<Void>> asyncPostRetry) {
392+
final BiFunction<RetrySignal, Mono<Void>, Mono<Void>> asyncPostRetry,
393+
final ContextView cv) {
393394
if (doPreRetry != NO_OP_CONSUMER) {
394395
try {
395396
doPreRetry.accept(copyOfSignal);
@@ -410,6 +411,6 @@ static <T> Mono<T> applyHooks(RetrySignal copyOfSignal,
410411
Mono<Void> preRetryMono = asyncPreRetry == NO_OP_BIFUNCTION ? Mono.empty() : asyncPreRetry.apply(copyOfSignal, Mono.empty());
411412
Mono<Void> postRetryMono = asyncPostRetry != NO_OP_BIFUNCTION ? asyncPostRetry.apply(copyOfSignal, postRetrySyncMono) : postRetrySyncMono;
412413

413-
return preRetryMono.then(originalCompanion).flatMap(postRetryMono::thenReturn);
414+
return preRetryMono.then(originalCompanion).flatMap(postRetryMono::thenReturn).contextWrite(cv);
414415
}
415416
}

reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import java.util.concurrent.atomic.AtomicBoolean;
2525
import java.util.concurrent.atomic.AtomicInteger;
2626
import java.util.concurrent.atomic.AtomicLong;
27+
import java.util.concurrent.atomic.AtomicReference;
2728
import java.util.function.Function;
29+
import java.util.function.Supplier;
2830
import java.util.stream.Collectors;
2931

3032
import org.assertj.core.api.Assertions;
@@ -39,16 +41,19 @@
3941
import reactor.core.scheduler.Scheduler;
4042
import reactor.core.scheduler.Schedulers;
4143
import reactor.test.StepVerifier;
44+
import reactor.test.publisher.PublisherProbe;
4245
import reactor.test.scheduler.VirtualTimeScheduler;
4346
import reactor.test.subscriber.AssertSubscriber;
4447
import reactor.util.context.Context;
4548
import reactor.util.context.ContextView;
4649
import reactor.util.function.Tuple2;
4750
import reactor.util.retry.Retry;
4851
import reactor.util.retry.RetryBackoffSpec;
52+
import reactor.util.retry.RetrySpec;
4953

5054
import static org.assertj.core.api.Assertions.assertThat;
5155
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
56+
import static org.mockito.BDDMockito.given;
5257

5358
public class FluxRetryWhenTest {
5459

@@ -58,6 +63,45 @@ public class FluxRetryWhenTest {
5863
Flux<Integer> rangeError = Flux.concat(Flux.range(1, 2),
5964
Flux.error(new RuntimeException("forced failure 0")));
6065

66+
@Test
67+
// https://github.com/reactor/reactor-core/issues/3314
68+
void ensuresContextIsRestoredInRetryFunctions() {
69+
PublisherProbe<Void> doBeforeRetryProbe = PublisherProbe.empty();
70+
AtomicReference<ContextView> capturedContext = new AtomicReference<>();
71+
72+
RetrySpec spec = Retry.max(1)
73+
.doBeforeRetryAsync(
74+
retrySignal ->
75+
Mono.deferContextual(cv -> {
76+
capturedContext.set(cv);
77+
return doBeforeRetryProbe.mono();
78+
})
79+
);
80+
81+
Context context = Context.of("test", "test");
82+
83+
Mono.defer(new Supplier<Mono<?>>() {
84+
int index = 0;
85+
86+
@Override
87+
public Mono<?> get() {
88+
if (index++ == 0) {
89+
return Mono.error(new RuntimeException());
90+
} else {
91+
return Mono.just("someValue");
92+
}
93+
}
94+
})
95+
.retryWhen(spec)
96+
.contextWrite(context)
97+
.as(StepVerifier::create)
98+
.expectNext("someValue")
99+
.verifyComplete();
100+
101+
doBeforeRetryProbe.assertWasSubscribed();
102+
assertThat(capturedContext).hasValueMatching(c -> c.hasKey("test"));
103+
}
104+
61105
@Test
62106
//https://github.com/reactor/reactor-core/issues/3253
63107
public void shouldFailWhenOnErrorContinueEnabled() {

0 commit comments

Comments
 (0)
0