22
22
import reactor .core .scheduler .Schedulers ;
23
23
import reactor .test .StepVerifier ;
24
24
25
- import java .util .Collections ;
26
25
import java .util .List ;
27
26
import java .util .concurrent .Callable ;
28
27
import java .util .concurrent .CompletableFuture ;
28
+ import java .util .concurrent .ThreadLocalRandom ;
29
29
import java .util .concurrent .atomic .AtomicLong ;
30
30
31
31
import org .junit .Test ;
@@ -42,8 +42,7 @@ public void synchronizeMono() {
42
42
43
43
Mono <String > tuco = Mono .just ("Tuco" ).subscribeOn (Schedulers .elastic ());
44
44
45
- // TODO: Obtain this value from the Mono above using hard synchronization
46
- String result = "" ;
45
+ String result = tuco .block ();
47
46
48
47
assertThat (result ).isEqualTo ("Tuco" );
49
48
}
@@ -53,8 +52,7 @@ public void synchronizeFlux() {
53
52
54
53
Flux <String > salamancas = Flux .just ("Hector" , "Tuco" );
55
54
56
- // TODO: Obtain this value from the Flux above using hard synchronization
57
- List <String > result = Collections .emptyList ();
55
+ List <String > result = salamancas .collectList ().block ();
58
56
59
57
assertThat (result ).contains ("Hector" , "Tuco" );
60
58
}
@@ -64,8 +62,7 @@ public void adoptToCompletableFuture() {
64
62
65
63
Mono <String > tuco = Mono .just ("Tuco" ).subscribeOn (Schedulers .elastic ());
66
64
67
- // TODO: Replace this line to create a CompletableFuture from Mono
68
- CompletableFuture <String > future = CompletableFuture .completedFuture ("foo" );
65
+ CompletableFuture <String > future = tuco .toFuture ();
69
66
70
67
future .thenAccept (s -> assertThat (s ).isEqualTo ("Tuco" )).join ();
71
68
}
@@ -78,13 +75,12 @@ public void blockingToReactive() {
78
75
return "Mike" ;
79
76
};
80
77
81
- // TODO: Observe the output sequence. Use subscribeOn and publishOn to adjust context switching behavior
82
- Mono <String > naive = Mono .fromCallable (takesAWhile )
78
+ Mono <String > naive = Mono .fromCallable (takesAWhile ).publishOn (Schedulers .elastic ())
83
79
.doOnSubscribe (it -> System .out .println (Thread .currentThread ().getName () + ": Subscribe" ))
84
80
.doOnSuccess (it -> System .out .println (Thread .currentThread ().getName () + ": Success" ));
85
81
86
82
System .out .println ("Before subscribe" );
87
- naive .subscribe ();
83
+ naive .subscribeOn ( Schedulers . parallel ()). block ();
88
84
System .out .println ("After subscribe" );
89
85
}
90
86
@@ -93,22 +89,26 @@ public void reactivePushFutureToMono() {
93
89
94
90
CompletableFuture <String > saul = new CompletableFuture <>();
95
91
96
- // TODO: Create a Mono from the CompletableFuture.
97
- Mono <String > naive = Mono .empty ();
92
+ Mono <String > naive = Mono .fromFuture (saul );
93
+
94
+ saul .complete ("Saul" );
98
95
99
96
System .out .println ("Before subscribe" );
100
97
naive .doOnSuccess (System .out ::println ).subscribe ();
101
98
System .out .println ("After subscribe" );
102
-
103
- // TODO: Move the completion before Mono.subscribe to explore the behavior
104
- saul .complete ("Saul" );
105
99
}
106
100
107
101
@ Test
108
102
public void reactivePullGenerator () {
109
103
110
104
Flux <Double > doubleStream = Flux .generate (AtomicLong ::new , (o , synchronousSink ) -> {
111
105
106
+ if (o .incrementAndGet () > 10 ) {
107
+ synchronousSink .complete ();
108
+ } else {
109
+ synchronousSink .next (ThreadLocalRandom .current ().nextDouble ());
110
+ }
111
+
112
112
return o ;
113
113
});
114
114
0 commit comments