8000 Step 7: Blocking and async complete · blackbeltcoder/reactive-spring@c497928 · GitHub
[go: up one dir, main page]

Skip to content

Commit c497928

Browse files
committed
Step 7: Blocking and async complete
1 parent dc2c5ce commit c497928

File tree

1 file changed

+15
-15
lines changed

1 file changed

+15
-15
lines changed

reactor/src/test/java/workshop/Step7BlockingAndAsync.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import reactor.core.scheduler.Schedulers;
2323
import reactor.test.StepVerifier;
2424

25-
import java.util.Collections;
2625
import java.util.List;
2726
import java.util.concurrent.Callable;
2827
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.ThreadLocalRandom;
2929
import java.util.concurrent.atomic.AtomicLong;
3030

3131
import org.junit.Test;
@@ -42,8 +42,7 @@ public void synchronizeMono() {
4242

4343
Mono<String> tuco = Mono.just("Tuco").subscribeOn(Schedulers.elastic());
4444

45-
// TODO: Obtain this value from the Mono above using hard synchronization
46-
String result = "";
45+
String result = tuco.block();
4746

4847
assertThat(result).isEqualTo("Tuco");
4948
}
@@ -53,8 +52,7 @@ public void synchronizeFlux() {
5352

5453
Flux<String> salamancas = Flux.just("Hector", "Tuco");
5554

56-
// TODO: Obtain this value from the Flux above using hard synchronization
57-
List<String> result = Collections.emptyList();
55+
List<String> result = salamancas.collectList().block();
5856

5957
assertThat(result).contains("Hector", "Tuco");
6058
}
@@ -64,8 +62,7 @@ public void adoptToCompletableFuture() {
6462

6563
Mono<String> tuco = Mono.just("Tuco").subscribeOn(Schedulers.elastic());
6664

67-
// TODO: Replace this line to create a CompletableFuture from Mono
68-
CompletableFuture<String> future = CompletableFuture.completedFuture("foo");
65+
CompletableFuture<String> future = tuco.toFuture();
6966

7067
future.thenAccept(s -> assertThat(s).isEqualTo("Tuco")).join();
7168
}
@@ -78,13 +75,12 @@ public void blockingToReactive() {
7875
return "Mike";
7976
};
8077

81-
// TODO: Observe the output sequence. Use subscribeOn and publishOn to adjust context switching behavior
82-
Mono<String> naive = Mono.fromCallable(takesAWhile)
78+
Mono<String> naive = Mono.fromCallable(takesAWhile).publishOn(Schedulers.elastic())
8379
.doOnSubscribe(it -> System.out.println(Thread.currentThread().getName() + ": Subscribe"))
8480
.doOnSuccess(it -> System.out.println(Thread.currentThread().getName() + ": Success"));
8581

8682
System.out.println("Before subscribe");
87-
naive.subscribe();
83+
naive.subscribeOn(Schedulers.parallel()).block();
8884
System.out.println("After subscribe");
8985
}
9086

@@ -93,22 +89,26 @@ public void reactivePushFutureToMono() {
9389

9490
CompletableFuture<String> saul = new CompletableFuture<>();
9591

96-
// TODO: Create a Mono from the CompletableFuture.
97-
Mono<String> naive = Mono.empty();
92+
Mono<String> naive = Mono.fromFuture(saul);
93+
94+
saul.complete("Saul");
9895

9996
System.out.println("Before subscribe");
10097
naive.doOnSuccess(System.out::println).subscribe();
10198
System.out.println("After subscribe");
102-
103-
// TODO: Move the completion before Mono.subscribe to explore the behavior
104-
saul.complete("Saul");
10599
}
106100

107101
@Test
108102
public void reactivePullGenerator() {
109103

110104
Flux<Double> doubleStream = Flux.generate(AtomicLong::new, (o, synchronousSink) -> {
111105

106+
if (o.incrementAndGet() > 10) {
107+
synchronousSink.complete();
108+
} else {
109+
synchronousSink.next(ThreadLocalRandom.current().nextDouble());
110+
}
111+
112112
return o;
113113
});
114114

0 commit comments

Comments
 (0)
0