8000 Merge #3312 into 3.5.1 · reactor/reactor-core@94e7226 · GitHub
[go: up one dir, main page]

Skip to content

Commit 94e7226

Browse files
author
Oleh Dokuka
committed
Merge #3312 into 3.5.1
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
2 parents 558173d + 9ae9858 commit 94e7226

File tree

5 files changed

+211
-14
lines changed

5 files changed

+211
-14
lines changed

reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ void drainAsync() {
382382
sp = iterable.spliterator();
383383
itFinite = FluxIterable.checkFinite(sp);
384384

385-
isEmpty = itFinite && sp.estimateSize() == 0;
385+
isEmpty = itFinite ? sp.estimateSize() == 0 : !hasNext(sp);
386386
}
387387
catch (Throwable exc) {
388388
sp = null;

reactor-core/src/main/java/reactor/core/publisher/FluxIterable.java

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,9 @@ static <T> boolean checkFinite(Spliterator<? extends T> spliterator) {
6161
@Nullable
6262
private final Runnable onClose;
6363

64-
FluxIterable(Iterable<? extends T> iterable, @Nullable Runnable onClose) {
65-
this.iterable = Objects.requireNonNull(iterable, "iterable");
66-
this.onClose = onClose;
67-
}
68-
6964
FluxIterable(Iterable<? extends T> iterable) {
70-
this(iterable, null);
65+
this.iterable = Objects.requireNonNull(iterable, "iterable");
66+
this.onClose = null;
7167
}
7268

7369
@Override
@@ -158,11 +154,51 @@ static <T> void subscribe(CoreSubscriber<? super T> s, Spliterator<? extends T>
158154
}
159155

160156
if (s instanceof ConditionalSubscriber) {
161-
s.onSubscribe(new IterableSubscriptionConditional<>((ConditionalSubscriber<? super T>) s,
162-
sp, knownToBeFinite, onClose));
157+
IterableSubscriptionConditional<? extends T> isc =
158+
new IterableSubscriptionConditional<>((ConditionalSubscriber<? super T>) s,
159+
sp,
160+
knownToBeFinite,
161+
onClose);
162+
163+
boolean hasNext;
164+
try {
165+
hasNext = isc.hasNext();
166+
}
167+
catch (Throwable ex) {
168+
Operators.error(s, ex);
169+
isc.onCloseWithDropError();
170+
return;
171+
}
172+
173+
if (!hasNext) {
174+
Operators.complete(s);
175+
isc.onCloseWithDropError();
176+
return;
177+
}
178+
179+
s.onSubscribe(isc);
163180
}
164181
else {
165-
s.onSubscribe(new IterableSubscription<>(s, sp, knownToBeFinite, onClose));
182+
IterableSubscription<? extends T> is =
183+
new IterableSubscription<>(s, sp, knownToBeFinite, onClose);
184+
185+
boolean hasNext;
186+
try {
187+
hasNext = is.hasNext();
188+
}
189+
catch (Throwable ex) {
190+
Operators.error(s, ex);
191+
is.onCloseWithDropError();
192+
return;
193+
}
194+
195+
if (!hasNext) {
196+
Operators.complete(s);
197+
is.onCloseWithDropError();
198+
return;
199+
}
200+
201+
s.onSubscribe(is);
166202
}
167203
}
168204

reactor-core/src/main/java/reactor/core/publisher/FluxStream.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616

1717
package reactor.core.publisher;
1818

19-
import java.util.Iterator;
2019
import java.util.Objects;
2120
import java.util.Spliterator;
22-
import java.util.Spliterators;
2321
import java.util.function.Supplier;
2422
import java.util.stream.Stream;
2523

reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,34 @@
2323
import java.util.Arrays;
2424
import java.util.Iterator;
2525
import java.util.List;
26+
import java.util.Spliterator;
2627
import java.util.concurrent.atomic.AtomicInteger;
2728
import java.util.function.Function;
2829
import java.util.stream.Collectors;
2930
import java.util.stream.IntStream;
31+
import java.util.stream.Stream;
3032

3133
import org.assertj.core.api.Assertions;
3234
import org.junit.jupiter.api.Test;
35+
import org.junit.jupiter.params.provider.MethodSource;
36+
import org.mockito.Mockito;
3337
import org.reactivestreams.Subscription;
3438
import reactor.core.CoreSubscriber;
3539
import reactor.core.Fuseable;
3640
import reactor.core.Scannable;
3741
import reactor.core.Scannable.Attr;
3842
import reactor.core.scheduler.Schedulers;
43+
import reactor.test.ParameterizedTestWithName;
3944
import reactor.test.StepVerifier;
4045
import reactor.test.publisher.FluxOperatorTest;
4146
import reactor.test.subscriber.AssertSubscriber;
4247
import reactor.util.concurrent.Queues;
4348
import reactor.util.context.Context;
4449

4550
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
51+
import static org.mockito.ArgumentMatchers.any;
52+
import static org.mockito.Mockito.mock;
53+
import static org.mockito.Mockito.when;
4654

4755
public class FluxFlattenIterableTest extends FluxOperatorTest<String, String> {
4856

@@ -678,6 +686,47 @@ public Context currentContext() {
678686
assertThat(test.currentKnownToBeFinite).as("knownFinite reset").isFalse();
679687
}
680688

689+
@ParameterizedTestWithName
690+
@MethodSource("reactor.core.publisher.FluxIterableTest#factory")
691+
public void testFluxIterableEmptyCase(Function<Flux, Flux> fn) {
692+
Iterable<String> iterable = mock(Iterable.class);
693+
Mockito.when(iterable.spliterator())
694+
.thenReturn(mock(Spliterator.class));
695+
696+
StepVerifier.create(
697+
Flux.just(1)
698+
.hide()
699+
.flatMapIterable(__ -> iterable)
700+
.as(fn)
701+
.next()
702+
)
703+
.expectSubscription()
704+
.expectComplete()
705+
.verify();
706+
}
707+
708+
@ParameterizedTestWithName
709+
@MethodSource("reactor.core.publisher.FluxIterableTest#factory")
710+
public void testFluxIterableErrorHasNext(Function<Flux, Flux> fn) {
711+
Iterable<String> iterable = mock(Iterable.class);
712+
Spliterator mock = mock(Spliterator.class);
713+
Mockito.when(iterable.spliterator())
714+
.thenReturn(mock);
715+
716+
when(mock.tryAdvance(any())).thenThrow();
717+
718+
StepVerifier.create(
719+
Flux.just(1)
720+
.hide()
721+
.flatMapIterable(__ -> iterable)
722+
.as(fn)
723+
.next()
724+
)
725+
.expectSubscription()
726+
.expectError()
727+
.verify();
728+
}
729+
681730
static class ReferenceCounted {
682731

683732
int refCount = 1;

reactor-core/src/test/java/reactor/core/publisher/FluxIterableTest.java

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,16 @@
2222
import java.util.concurrent.CountDownLatch;
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.function.Function;
26+
import java.util.stream.Stream;
2527

2628
import org.assertj.core.api.Assertions;
2729
import org.assertj.core.api.InstanceOfAssertFactories;
2830
import org.junit.jupiter.api.Test;
2931
import org.junit.jupiter.api.Timeout;
32+
import org.junit.jupiter.params.ParameterizedTest;
33+
import org.junit.jupiter.params.provider.MethodSource;
34+
import org.junit.jupiter.params.provider.ValueSource;
3035
import org.mockito.Mockito;
3136
import org.reactivestreams.Subscriber;
3237
import org.reactivestreams.Subscription;
@@ -36,6 +41,7 @@
3641
import reactor.core.Scannable;
3742
import reactor.core.scheduler.Schedulers;
3843
import reactor.test.MockUtils;
44+
import reactor.test.ParameterizedTestWithName;
3945
import reactor.test.StepVerifier;
4046
import reactor.test.subscriber.AssertSubscriber;
4147
import reactor.util.annotation.NonNull;
@@ -44,11 +50,119 @@
4450

4551
import static org.assertj.core.api.Assertions.assertThat;
4652
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
53+
import static org.mockito.ArgumentMatchers.any;
54+
import static org.mockito.Mockito.mock;
55+
import static org.mockito.Mockito.when;
4756

4857
public class FluxIterableTest {
4958

5059
final Iterable<Integer> source = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
5160

61+
static Stream<Function<Flux, Flux>> factory() {
62+
return Stream.of(new Function<Flux, Flux>() {
63+
@Override
64+
public Flux apply(Flux flux) {
65+
return flux;
66+
}
67+
68+
@Override
69+
public String toString() {
70+
return "normal fast-path";
71+
}
72+
}, new Function<Flux, Flux>() {
73+
@Override
74+
public Flux apply(Flux flux) {
75+
return flux.filter(__ -> true);
76+
}
77+
78+
@Override
79+
public String toString() {
80+
return "conditional fast-path";
81+
}
82+
}, new Function<Flux, Flux>() {
83+
@Override
84+
public Flux apply(Flux flux) {
85+
return flux.limitRate(1);
86+
}
87+
88+
@Override
89+
public String toString() {
90+
return "fused";
91+
}
92+
}, new Function<Flux, Flux>() {
93+
@Override
94+
public Flux apply(Flux flux) {
95+
return flux.hide()
96+
.limitRate(1);
97+
}
98+
99+
@Override
100+
public String toString() {
101+
return "normal slow-path";
102+
}
103+
}, new Function<Flux, Flux>() {
104+
@Override
105+
public Flux apply(Flux flux) {
106+
return flux.filter(__ -> true)
107+
.hide()
108+
.limitRate(1);
109+
}
110+
111+
@Override
112+
public String toString() {
113+
return "conditional slow-path";
114+
}
115+
}, new Function<Flux, Flux>() {
116+
@Override
117+
public Flux apply(Flux flux) {
118+
return flux.filter(__ -> true)
119+
.limitRate(1);
120+
}
121+
122+
@Override
123+
public String toString() {
124+
return "conditional-fused";
125+
}
126+
});
127+
}
128+
129+
@ParameterizedTestWithName
130+
@MethodSource("factory")
131+
public void testFluxIterableEmptyCase(Function<Flux, Flux> fn) {
132+
Iterable<String> iterable = mock(Iterable.class);
133+
Mockito.when(iterable.spliterator())
134+
.thenReturn(mock(Spliterator.class));
135+
136+
StepVerifier.create(
137+
Flux.fromIterable(iterable)
138+
.as(fn)
139+
.next()
140+
)
141+
.expectSubscription()
142+
.expectComplete()
143+
.verify();
144+
}
145+
146+
@ParameterizedTestWithName
147+
@MethodSource("factory")
148+
public void testFluxIterableErrorHasNext(Function<Flux, Flux> fn) {
149+
Iterable<String> iterable = mock(Iterable.class);
150+
Spliterator mock = mock(Spliterator.class);
151+
Mockito.when(iterable.spliterator())
152+
.thenReturn(mock);
153+
154+
when(mock.tryAdvance(any())).thenThrow();
155+
156+
StepVerifier.create(
157+
Flux.fromIterable(iterable)
158+
.as(fn)
159+
.next()
160+
)
161+
.expectSubscription()
162+
.expectError()
163+
.verify();
164+
}
165+
52166
@Test
53167
//https://github.com/reactor/reactor-core/issues/3295
54168
public void useIterableOncePerSubscriber() {
@@ -253,7 +367,7 @@ public void scanSubscription() {
253367
@Test
254368
public void scanConditionalSubscription() {
255369
@SuppressWarnings("unchecked")
256-
Fuseable.ConditionalSubscriber<? super String> actual = Mockito.mock(MockUtils.TestScannableConditionalSubscriber.class);
370+
Fuseable.ConditionalSubscriber<? super String> actual = mock(MockUtils.TestScannableConditionalSubscriber.class);
257371
Mockito.when(actual.currentContext()).thenReturn(Context.empty());
258372
FluxIterable.IterableSubscriptionConditional<String> test =
259373
new FluxIterable.IterableSubscriptionConditional<>(actual, Collections.singleton("test").spliterator(), true);
@@ -328,7 +442,7 @@ void smokeTestIterableConditionalSubscriptionWithInfiniteIterable() {
328442
Context discardingContext = Operators.enableOnDiscard(Context.empty(), v -> { });
329443

330444
@SuppressWarnings("unchecked")
331-
Fuseable.ConditionalSubscriber<Integer> testSubscriber = Mockito.mock(Fuseable.ConditionalSubscriber.class);
445+
Fuseable.ConditionalSubscriber<Integer> testSubscriber = mock(Fuseable.ConditionalSubscriber.class);
332446
Mockito.when(testSubscriber.currentContext()).thenReturn(discardingContext);
333447

334448
Spliterator<Integer> iterator = Spliterators.spliteratorUnknownSize(new Iterator<Integer>() {

0 commit comments

Comments
 (0)
0