java - 如何持续观察对象直到 onError()/取消订阅

标签 java asynchronous reactive-programming rx-java observable

我需要观察对象列表,并在列表中出现新对象时立即发出事件。就像客户端-服务器应用程序一样,客户端将内容添加到列表中,服务器将新添加的内容发送给在服务器中注册的所有客户端。

到目前为止,我的可观察对象仅发出一项,之后不再发出任何其他内容。换句话说,当一个新对象添加到列表中时,会发生一个事件,但当添加第二个(或第三个、第四个、第五个……)对象时,不会发生事件。

一旦列表中有新对象,我需要它继续发出项目。

这是我的代码,其中 MyList 是我自己的 List 实现(与我的问题无关)。我希望发出的事件是新添加到 MyList 的对象。

private void onClickMethod() {
    MyList myList = populateMyList();

    onNexAction = new Action1<MyList>() {
        @Override
        public void call(MyList myList) {
            System.out.println("call()");
            TheObject theObject = myList.getNext();
            System.out.println("New Event: " + theObject.getData());
        }
    };

    myObservable = Observable.create(new Observable.OnSubscribe<MyList>() {
        @Override
        public void call(Subscriber<? super MyList> t) {
            t.onNext(myList);
        }
    });

    myObservable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).subscribe(onNexAction);
}

上面的代码只会发出第一个添加的对象。但是如果我在 call() 中放置一个 while 循环,它将发出所有事件,就像我想要的那样。但我觉得这不是做我想做的事的适当方式。我觉得我更像是轮询而不是异步。

这是完全相同的代码,但带有 while 循环:

private void onClickMethod() {
    MyList myList = populateMyList();

    onNexAction = new Action1<MyList>() {
        @Override
        public void call(MyList myList) {
            System.out.println("call()");
            while (myList.size() > 0) { // The while cycle
                TheObject theObject = myList.getNext();
                System.out.println("New Event: " + theObject.getData());
            }
        }
    };

    myObservable = Observable.create(new Observable.OnSubscribe<MyList>() {
        @Override
        public void call(Subscriber<? super MyList> t) {
            t.onNext(myList);
        }
    });

    myObservable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).subscribe(onNexAction);
}

那么,我在这里做错了什么?一旦 MyList 中有新对象,如何让我的可观察对象立即发出新事件?

编辑:

根据 Tassos Bassoukos 的建议,我应用了 distinct() 运算符,如下所示:

myObservable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.io()).distinct().subscribe(onNexAction);

但这并不能解决我的问题。仅发出第一个添加的对象。

最佳答案

这是 PublishSubject 的典型案例:

class MyList<T> {
    final PublishSubject<T> onAdded = PublishSubject.create();
    void add(T value) {
        // add internally
        onAdded.onNext(value);
    }
    Observable<T> onAdded() {
        return onAdded;
    }
}

MyList<Integer> list = populate();

Subscription s = list.onAdded()
    .subscribe(v -> System.out.println("Added: " + v));

list.add(1);  // prints "Added: 1"
list.add(2);  // prints "Added: 2"
list.add(3);  // prints "Added: 3"

s.unsubscribe(); // not interested anymore

list.add(4);  // doesn't print anything
list.add(5);  // doesn't print anything

关于java - 如何持续观察对象直到 onError()/取消订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30584722/

相关文章:

c# - 执行多个CMD任务并异步更新

javascript - 异步 JavaScript : Trying to make AJAX API calls multiple times at interval?

rx-java - 如何在完成时刷新 Rx 缓冲区?

c# - Rx 通过加入一个属性来组合许多流

java - 从 map 同步设置还是从同步 map 设置?

某些设备上的 java.lang.ClassCastException

Java-类名以数字开头

c# - 异步和等待返回类型混淆

reactive-programming - 订阅后函数现在可以正常执行

java - SpringMVC ListBox人口