java - 在 doOnSubscribe 中调用 subject.onNext()

标签 java rx-java2

为什么在 doOnSubscribe 内调用 subject.onNext(o) 没有任何效果,但是调用 subject.onComplete() 会导致流终止!?

final PublishSubject<Integer> subject = PublishSubject.create();

    final Observable<Integer> observable = subject.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(@NonNull Disposable disposable) throws Exception {
            System.out.println("disposable = [" + disposable + "]");
            subject.onNext(1);
            //or
            Observable.just(2, 3).subscribe(subject);
        }
    });

    observable.subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("d = [" + d.isDisposed() + "]");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("item = [" + integer + "]");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("e = [" + e + "]");
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });/*
    expected:
    disposable = [false]
    d = [false]
    item = 1
    item = 2
    item = 3
    onComplete
    but received :
    disposable = [false]
    d = [false]
    onComplete
    */

最佳答案

订阅 Subject 时在 2.x 中,Disposable代表连接正在遍历onSubscribe()特定之前的链条Observer onNext 变得可见。如果您调用 hasObservers 就可以看到这一点来自onSubscribe它将返回 false 直到 onSubscribe实际上返回。

这是 Observable 协议(protocol)所必需的,因为它不允许运行 onSubscribeonNext同时和onSubscribe必须发生在onNext之前。如果不遵守此规则,则并发调用 Subject.onNext会在 Observer.onSubscribe 之前甚至同时运行打电话并找到可能没有准备好的消费者。

PublishSubject不保留任何onNext未观察到的调用 onNext元素被丢弃。根据用例,您应该使用 BehaviorSubjectyourSubject.startWith(initialValue).subscribe()在任何其他 onNext 之前获取一个值来自Subject .

关于java - 在 doOnSubscribe 中调用 subject.onNext(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43253399/

相关文章:

android - RxJava - 将列表的结果映射到另一个列表

android - 在 Android 上使用架构组件将 RxJava 代码转换为新代码

java - "No Instance of type variable R exist so that Observable conforms to Observable"更新到 RxJava2 时出错

java - 使用特定模式重新格式化字符串

java - 参数和参数有什么区别?

java - IntelliJ IDEA 中的 JTS 拓扑套件

java - 如何记录 rxJava2 的所有异常的堆栈跟踪?

java - Scala中获取Json头节点值

java - java.util.Calendar 线程是否安全?

kotlin - 使用 RxJava 构建 MVI 循环 : how to replace BehaviorSubject with scan()