一点背景:
在这个链接中:https://github.com/ReactiveX/RxJava/issues/448 @ben-lesh 提出了使用 Observables 进行轮询的手动递归实现。
然而在最新的 RxJava 版本中没有 OnSubscribeFunc
。
这是我当前的实现:
Observable.create(new Observable.OnSubscribe<Item>() {
@Override
public void call(final Subscriber<? super Item> innerSubscriber) {
Schedulers.io().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
searchObservable()
.doOnNext(new Action1<Item>() {
@Override
public void call(Item item) {
innerSubscriber.onNext(item);
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (throwable != null) {
innerSubscriber.onError(throwable);
}
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
innerSubscriber.onCompleted();
}
}).subscribe(); // Set subscriber?
}
}, initialDelay, pollingInterval, TimeUnit.MINUTES);
}
})
.subscribeOn(Schedulers.io()) // performs networking on background thread
.observeOn(observeOnScheduler) // sends notifications to another Scheduler, usually the UI thread
.subscribe(subscriber); // The subscriber
searchObservable
执行服务请求。这在第一次运行时运行良好,也就是说,数据被传递给 subscriber
。但是,等待 pollingInterval
后,数据返回并执行 doOnNext
但数据没有传递到 UI。我是否需要在 schedulePeriodically
采取的 Action
中设置任何订阅者?
最佳答案
它停止是因为您正在调用 innerSubscriber.onCompleted
,这会在第一次运行时终止序列。有一些标准运算符可以让您获得相同的效果,而无需创建自定义 Observable:
Observable.interval(initialDelay, pollingInterval, TimeUnit.MINUTES, Schedulers.io())
.onBackpressureBuffer()
.concatMap(v -> searchObservable())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
(注意:这里不需要 subscribeOn(),因为 interval 无论如何都会在 Schedulers.io() 上发出。)
关于scheduled-tasks - 使用 Observables 进行轮询的手动递归的更新版本是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34454273/