8000 Step 6: Adapters complete · blackbeltcoder/reactive-spring@dc2c5ce · GitHub
[go: up one dir, main page]

Skip to content

Commit dc2c5ce

Browse files
committed
Step 6: Adapters complete
1 parent 18a795e commit dc2c5ce

File tree

1 file changed

+11
-14
lines changed

1 file changed

+11
-14
lines changed

reactor/src/test/java/workshop/Step6Adapters.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,17 @@
1515
*/
1616
package workshop;
1717

18+
import io.reactivex.BackpressureStrategy;
1819
import io.reactivex.Flowable;
1920
import reactor.core.publisher.Flux;
2021
import reactor.core.publisher.Mono;
2122
import reactor.test.StepVerifier;
2223
import rx.Observable;
24+
import rx.RxReactiveStreams;
2325
import rx.Single;
2426

27+
import java.util.NoSuchElementException;
28+
2529
import org.junit.Test;
2630

2731
/**
@@ -36,8 +40,7 @@ public void adoptRxJava1ObservableToFlux() {
3640

3741
Observable<String> people = Observable.just("Jesse", "Hank");
3842

39-
// TODO: Use RxReactiveStreams to create a Flux from Observable.
40-
Flux<String> flux = Flux.empty();
43+
Flux<String> flux = Flux.from(RxReactiveStreams.toPublisher(people));
4144

4245
StepVerifier.create(flux).expectNext("Jesse", "Hank").verifyComplete();
4346
}
@@ -47,8 +50,7 @@ public void adoptRxJava1SingleToMono() {
4750

4851
Single<String> jesse = Single.just("Jesse");
4952

50-
// TODO: Use RxReactiveStreams to create a Flux from Observable.
51-
Mono<String> mono = Mono.empty();
53+
Mono<String> mono = Mono.from(RxReactiveStreams.toPublisher(jesse));
5254

5355
StepVerifier.create(mono).expectNext("Jesse").verifyComplete();
5456
}
@@ -58,20 +60,17 @@ public void adoptRxJava1EmptySingleToMono() {
5860

5961
Single<String> empty = Observable.<String> empty().toSingle();
6062

61-
// TODO: Use RxReactiveStreams to create a Flux from Observable.
62-
Mono<String> mono = Mono.empty();
63+
Mono<String> mono = Mono.from(RxReactiveStreams.toPublisher(empty));
6364

64-
// Expect a surprise here
65-
StepVerifier.create(mono).verifyComplete();
65+
StepVerifier.create(mono).expectError(NoSuchElementException.class).verify();
6666
}
6767

6868
@Test
6969
public void adoptRxJava2ObservableToFlux() {
7070

7171
io.reactivex.Observable<String> jesse = io.reactivex.Observable.just("Jesse");
7272

73-
// TODO: Use RxJava 2's Flowable to create a Flux
74-
Flux<String> flux = Flux.empty();
73+
Flux<String> flux = Flux.from(jesse.toFlowable(BackpressureStrategy.BUFFER));
7574

7675
StepVerifier.create(flux).expectNext("Jesse").verifyComplete();
7776
}
@@ -81,8 +80,7 @@ public void adoptFluxToRxJavaObservable() throws Exception {
8180

8281
Flux<String> people = Flux.just("Jesse", "Hank");
8382

84-
// TODO: Use RxReactiveStreams to create an Observable from Flux.
85-
Observable<String> observable = Observable.empty();
83+
Observable<String> observable = RxReactiveStreams.toObservable(people);
8684

8785
observable.test().awaitTerminalEvent().assertResult("Jesse", "Hank");
8886
}
@@ -92,8 +90,7 @@ public void adoptFluxToRxJava2Flowable() throws Exception {
9290

9391
Flux<String> people = Flux.just("Jesse", "Hank");
9492

95-
// TODO: Use RxReactiveStreams to create an Flowable from Flux.
96-
Flowable<String> flowable = Flowable.empty();
93+
Flowable<String> flowable = Flowable.fromPublisher(people);
9794

9895
flowable.test().await().assertResult("Jesse", "Hank").awaitTerminalEvent();
9996
}

0 commit comments

Comments
 (0)
0