scheduled-tasks - 使用 Observables 进行轮询的手动递归的更新版本是什么?

标签 scheduled-tasks reactive-programming rx-java observable rx-android

一点背景: 在这个链接中:https://github.com/ReactiveX/RxJava/issues/448 @ben-lesh 提出了使用 Observables 进行轮询的手动递归实现。 然而在最新的 RxJava 版本中没有 OnSubscribeFunc

这是我当前的实现:

Observable.create(new Observable.OnSubscribe<Item>() {
        @Override
        public void call(final Subscriber<? super Item> innerSubscriber) {

            Schedulers.io().createWorker()
                      .schedulePeriodically(new Action0() {
                          @Override
                          public void call() {
                              searchObservable()
                                      .doOnNext(new Action1<Item>() {
                                          @Override
                                          public void call(Item item) {
                                              innerSubscriber.onNext(item);
                                          }
                                      })
                                      .doOnError(new Action1<Throwable>() {
                                          @Override
                                          public void call(Throwable throwable) {
                                              if (throwable != null) {
                                                  innerSubscriber.onError(throwable);
                                              }
                                          }
                                      })
                                      .doOnCompleted(new Action0() {
                                          @Override
                                          public void call() {
                                              innerSubscriber.onCompleted();
                                          }
                                      }).subscribe(); // Set subscriber?
                          }
                      }, initialDelay, pollingInterval, TimeUnit.MINUTES);
        }
    })
            .subscribeOn(Schedulers.io()) // performs networking on background thread
            .observeOn(observeOnScheduler) // sends notifications to another Scheduler, usually the UI thread
            .subscribe(subscriber); // The subscriber

searchObservable 执行服务请求。这在第一次运行时运行良好,也就是说,数据被传递给 subscriber。但是,等待 pollingInterval 后,数据返回并执行 doOnNext 但数据没有传递到 UI。我是否需要在 schedulePeriodically 采取的 Action 中设置任何订阅者?

最佳答案

它停止是因为您正在调用 innerSubscriber.onCompleted,这会在第一次运行时终止序列。有一些标准运算符可以让您获得相同的效果,而无需创建自定义 Observable:

Observable.interval(initialDelay, pollingInterval, TimeUnit.MINUTES, Schedulers.io())
.onBackpressureBuffer()
.concatMap(v -> searchObservable())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);

(注意:这里不需要 subscribeOn(),因为 interval 无论如何都会在 Schedulers.io() 上发出。)

关于scheduled-tasks - 使用 Observables 进行轮询的手动递归的更新版本是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34454273/

相关文章:

java - 如何使用 switchIfEmpty RxJava

java - 响应式(Reactive)编程中的订阅是什么

Windows 批处理文件压缩,然后 ftp 文件

windows - 如何在系统启动后20分钟运行windows计划任务?

algorithm - 动态任务调度面试街

java - FlatMap 何时会同时监听多个源?

haskell - 为什么这个 Yampa 弹球会陷入死循环?

android - 我想在android中的不同日期(星期一,星期二等)的特定时间安排任务

c# - 在 UniRX 中将一个 Observable 映射到另一个不同类型的 Observable

java - 在 RxJava 中倒带一个 Observable?