java - RxJava - 当同时使用observeOn和subscribeOn时,ConnectableObservable无法通知其观察者超过128次

标签 java reactive-programming rx-java

我有一个使用运行很长时间的 ConnectableObservable 的应用程序。神秘的是,一段时间后,它的观察者停止在其 onNext() 方法中接收通知。

我编写了以下测试来简化示例。它只是一个具有无限循环的 ConnectableObservable,其中一个订阅者同时使用 watchOn 和 subscribeon。调用 128 s.onNext(1) 后,它会停止通知观察者。

@Test
public void testHotObservable() throws InterruptedException{

    CountDownLatch latch = new CountDownLatch(1);

    ConnectableObservable<Integer> observable = Observable.<Integer>create( (s) -> {
        while(true){
            try {
                Thread.sleep(500);
            } catch (Exception e) {
                e.printStackTrace();
            }
            s.onNext(1);
        }
    })
    .observeOn(Schedulers.io())
    .subscribeOn(Schedulers.io())
    .publish();

    Observer<Integer> observer = new Observer<Integer>() {
        @Override
        public void onNext(Integer i) {
            System.out.println("got "+i);
        }
        @Override
        public void onCompleted() {
            System.out.println("completed");
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
    };

    observable.subscribe(observer);
    observable.connect();

    latch.await();
}

这是我调试 RxJava 代码时看到的,我找到了它不调用 Observer 的 onNext() 方法的原因,但我不明白:

1.- s.onNext(1); 被调用:

2.- 执行到达rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber.pollQueue():

void pollQueue() {
    int emitted = 0;
    final AtomicLong localRequested = this.requested;
    final AtomicLong localCounter = this.counter;
    do {
        localCounter.set(1);
        long produced = 0;
        long r = localRequested.get();            
        for (;;) {
            ...
            System.out.println("R: "+r);
            if (r > 0) {
                Object o = queue.poll();
                if (o != null) {
                    child.onNext(on.getValue(o));
                    r--;

问题在于 r 的值。第一次执行时,其值始终为 128。每次调用后,它都会减 1 (r--)。这意味着当同时使用observeOn和subscribeOn时,ConnectableObservable只能通知其观察者128次。如果我删除 subscribeOn,r 的值将从每次迭代开始并且有效。

更新:

我找到了解决方案:问题是由.observerOn().subscribeOn()的顺序引起的。如果我将其反转为 .subscribeOn().observeOn() ,它就会起作用(我可以看到 r 的值始终重置为 128)。

无论如何,我希望得到一个解释。

最佳答案

许多异步运算符使用内部固定大小的缓冲区,并依赖于订阅者的频繁请求。就你而言,有些东西没有正确要求,我无法说出它是什么。我建议使用标准组件尝试您的用例,看看可能出了什么问题,即您可以用 PublishSubject + 示例替换您的自定义 Observable:

Subject<Integer, Integer> source = PublishSubject.<Integer>create().toSerialized();

ConnectableObservable<Integer> co = source.sample(
    500, TimeUnit.MILLISECONDS, Schedulers.io())
.onBackpressureBuffer().publish();

co.subscribe(yourSubscriber);
co.connect();

source.onNext(1);

关于java - RxJava - 当同时使用observeOn和subscribeOn时,ConnectableObservable无法通知其观察者超过128次,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34749534/

相关文章:

java - RxJava : How to get all results AND errors from an Observable

R Shiny : Observe only works once

android - Realm 自行关闭

java - .timeout 与 RxJava 观察者

java - RxJava - 观察可能总是变化的数据

Java 语法 >>, &, ?和 :

java - JUnit:使用构造函数而不是@Before

java - 如何在Java中使用Unix纪元每分钟打印数据

java - jetty 9 websockets onFrame 事件

rx-java - RxJava - 如何获取列表的第一个元素并将其作为 Observable 返回