Description
When a subscription to an Observable.delay(delayErrors=true)
operator is disposed of concurrently to the subscription's emissions, the onNext
signals it issues to its downstream can be dropped at random.
See the following repro:
Scheduler scheduler = Schedulers.computation();
int maxValue = 10;
int maxAttempts = 1000;
for (int attempt = 0; attempt < maxAttempts; attempt++) {
ConcurrentLinkedDeque<Notification<Integer>> sink = new ConcurrentLinkedDeque<>();
SequentialDisposable firstDisposable = new SequentialDisposable();
CountDownLatch latch = new CountDownLatch(1);
Observable<Integer> integers = Observable.range(0, maxValue);
Observable<Notification<Integer>> first = integers.delay(5, TimeUnit.MICROSECONDS, scheduler, true)
.materialize()
.doOnNext(sink::add)
.doOnSubscribe(firstDisposable::replace);
Observable<?> second = integers.delay(5, TimeUnit.MICROSECONDS, scheduler)
.doOnNext($ -> firstDisposable.dispose())
.doOnTerminate(latch::countDown);
first.subscribe();
second.subscribe();
latch.await();
boolean foundTermination = false;
int nextExpectedValue = 0;
List<String> errors = new ArrayList<>();
List<Notification<Integer>> notifications = List.copyOf(sink);
for (int i = 0; i < notifications.size(); i++) {
Notification<Integer> notification = notifications.get(i);
if (notification.isOnComplete() || notification.isOnError()) {
foundTermination = true;
}
if (notification.isOnNext()) {
if (foundTermination) {
errors.add("Found onNext after termination at index " + i);
}
int value = notification.getValue();
if (value != nextExpectedValue) {
errors.add((errors.size() + 1) + ". Wrong value at index " + i + " (expected " + nextExpectedValue +
", was " + value + ")");
}
nextExpectedValue = value + 1;
}
}
if (!errors.isEmpty()) {
throw new AssertionError(new StringBuilder()
.append("Attempt ")
.append(attempt)
.append(" failed.\n\t\tError(s):\n\t\t\t")
.append(String.join("\n\t\t\t", errors))
.append("\n\t\tNotifications: ")
.append(notifications)
.toString());
}
}
Here is an example failure:
java.lang.AssertionError: Attempt 11 failed.
Error(s):
1. Wrong value at index 2 (expected 2, was 3)
2. Wrong value at index 3 (expected 4, was 5)
3. Wrong value at index 6 (expected 8, was 9)
Notifications: [OnNextNotification[0], OnNextNotification[1], OnNextNotification[3], OnNextNotification[5], OnNextNotification[6], OnNextNotification[7], OnNextNotification[9]]
I understand that a subscription that is actively producing signals may continue to propagate those signals to the downstream when disposed of concurrently (i.e. disposal is "eventually consistent"). But I would expect that any signals emitted during or after the race would preserve their original ordering. In other words, once delay()
has caught up to its newly-disposed state and does not emit its N
th onNext
signal, it seems wrong for it to go on to emit its N+1
th signal.