10000 Observable.delay(delayErrors=true) drops onNext signals at random when disposed of concurrently · Issue #7851 · ReactiveX/RxJava · GitHub
[go: up one dir, main page]

Skip to content
Observable.delay(delayErrors=true) drops onNext signals at random when disposed of concurrently #7851
Closed
@joepembe

Description

@joepembe

When a subscription to an Observable.delay(delayErrors=true) operator is disposed of concurrently to the subscription's emissions, the onNext signals it issues to its downstream can be dropped at random.

See the following repro:

Scheduler scheduler = Schedulers.computation();
int maxValue = 10;
int maxAttempts = 1000;
for (int attempt = 0; attempt < maxAttempts; attempt++) {
    ConcurrentLinkedDeque<Notification<Integer>> sink = new ConcurrentLinkedDeque<>();
    SequentialDisposable firstDisposable = new SequentialDisposable();
    CountDownLatch latch = new CountDownLatch(1);

    Observable<Integer> integers = Observable.range(0, maxValue);
    Observable<Notification<Integer>> first = integers.delay(5, TimeUnit.MICROSECONDS, scheduler, true)
            .materialize()
            .doOnNext(sink::add)
            .doOnSubscribe(firstDisposable::replace);
    Observable<?> second = integers.delay(5, TimeUnit.MICROSECONDS, scheduler)
            .doOnNext($ -> firstDisposable.dispose())
            .doOnTerminate(latch::countDown);

    first.subscribe();
    second.subscribe();
    latch.await();

    boolean foundTermination = false;
    int nextExpectedValue = 0;
    List<String> errors = new ArrayList<>();
    List<Notification<Integer>> notifications = List.copyOf(sink);
    for (int i = 0; i < notifications.size(); i++) {
        Notification<Integer> notification = notifications.get(i);
        if (notification.isOnComplete() || notification.isOnError()) {
            foundTermination = true;
        }
        if (notification.isOnNext()) {
            if (foundTermination) {
                errors.add("Found onNext after termination at index " + i);
            }
            int value = notification.getValue();
            if (value != nextExpectedValue) {
                errors.add((errors.size() + 1) + ". Wrong value at index " + i + " (expected " + nextExpectedValue +
                        ", was " + value + ")");
            }
            nextExpectedValue = value + 1;
        }
    }
    if (!errors.isEmpty()) {
        throw new AssertionError(new StringBuilder()
                .append("Attempt ")
                .append(attempt)
                .append(" failed.\n\t\tError(s):\n\t\t\t")
                .append(String.join("\n\t\t\t", errors))
                .append("\n\t\tNotifications: ")
                .append(notifications)
                .toString());
    }
}

Here is an example failure:

java.lang.AssertionError: Attempt 11 failed.
            Error(s):
        1. Wrong value at index 2 (expected 2, was 3)
        2. Wrong value at index 3 (expected 4, was 5)
        3. Wrong value at index 6 (expected 8, was 9)
        Notifications: [OnNextNotification[0], OnNextNotification[1], OnNextNotification[3], OnNextNotification[5], OnNextNotification[6], OnNextNotification[7], OnNextNotification[9]]

I understand that a subscription that is actively producing signals may continue to propagate those signals to the downstream when disposed of concurrently (i.e. disposal is "eventually consistent"). But I would expect that any signals emitted during or after the race would preserve their original ordering. In other words, once delay() has caught up to its newly-disposed state and does not emit its Nth onNext signal, it seems wrong for it to go on to emit its N+1th signal.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      0