15
15
*/
16
16
package workshop ;
17
17
18
+ import io .reactivex .BackpressureStrategy ;
18
19
import io .reactivex .Flowable ;
19
20
import reactor .core .publisher .Flux ;
20
21
import reactor .core .publisher .Mono ;
21
22
import reactor .test .StepVerifier ;
22
23
import rx .Observable ;
24
+ import rx .RxReactiveStreams ;
23
25
import rx .Single ;
24
26
27
+ import java .util .NoSuchElementException ;
28
+
25
29
import org .junit .Test ;
26
30
27
31
/**
@@ -36,8 +40,7 @@ public void adoptRxJava1ObservableToFlux() {
36
40
37
41
Observable <String > people = Observable .just ("Jesse" , "Hank" );
38
42
39
- // TODO: Use RxReactiveStreams to create a Flux from Observable.
40
- Flux <String > flux = Flux .empty ();
43
+ Flux <String > flux = Flux .from (RxReactiveStreams .toPublisher (people ));
41
44
42
45
StepVerifier .create (flux ).expectNext ("Jesse" , "Hank" ).verifyComplete ();
43
46
}
@@ -47,8 +50,7 @@ public void adoptRxJava1SingleToMono() {
47
50
48
51
Single <String > jesse = Single .just ("Jesse" );
49
52
50
- // TODO: Use RxReactiveStreams to create a Flux from Observable.
51
- Mono <String > mono = Mono .empty ();
53
+ Mono <String > mono = Mono .from (RxReactiveStreams .toPublisher (jesse ));
52
54
53
55
StepVerifier .create (mono ).expectNext ("Jesse" ).verifyComplete ();
54
56
}
@@ -58,20 +60,17 @@ public void adoptRxJava1EmptySingleToMono() {
58
60
59
61
Single <String > empty = Observable .<String > empty ().toSingle ();
60
62
61
- // TODO: Use RxReactiveStreams to create a Flux from Observable.
62
- Mono <String > mono = Mono .empty ();
63
+ Mono <String > mono = Mono .from (RxReactiveStreams .toPublisher (empty ));
63
64
64
- // Expect a surprise here
65
- StepVerifier .create (mono ).verifyComplete ();
65
+ StepVerifier .create (mono ).expectError (NoSuchElementException .class ).verify ();
66
66
}
67
67
68
68
@ Test
69
69
public void adoptRxJava2ObservableToFlux () {
70
70
71
71
io .reactivex .Observable <String > jesse = io .reactivex .Observable .just ("Jesse" );
72
72
73
- // TODO: Use RxJava 2's Flowable to create a Flux
74
- Flux <String > flux = Flux .empty ();
73
+ Flux <String > flux = Flux .from (jesse .toFlowable (BackpressureStrategy .BUFFER ));
75
74
76
75
StepVerifier .create (flux ).expectNext ("Jesse" ).verifyComplete ();
77
76
}
@@ -81,8 +80,7 @@ public void adoptFluxToRxJavaObservable() throws Exception {
81
80
82
81
Flux <String > people = Flux .just ("Jesse" , "Hank" );
83
82
84
- // TODO: Use RxReactiveStreams to create an Observable from Flux.
85
- Observable <String > observable = Observable .empty ();
83
+ Observable <String > observable = RxReactiveStreams .toObservable (people );
86
84
87
85
observable .test ().awaitTerminalEvent ().assertResult ("Jesse" , "Hank" );
88
86
}
@@ -92,8 +90,7 @@ public void adoptFluxToRxJava2Flowable() throws Exception {
92
90
93
91
Flux <String > people = Flux .just ("Jesse" , "Hank" );
94
92
95
- // TODO: Use RxReactiveStreams to create an Flowable from Flux.
96
- Flowable <String > flowable = Flowable .empty ();
93
+ Flowable <String > flowable = Flowable .fromPublisher (people );
97
94
98
95
flowable .test ().await ().assertResult ("Jesse" , "Hank" ).awaitTerminalEvent ();
99
96
}
0 commit comments