java - RxJava 2 Observable onComplete 重新提交自身

标签 java observable rx-java2

我是 RxJava 新手。我正在尝试创建一个可观察的对象,当它完成时,它将重新开始,直到我调用 dispose,但是一段时间后我遇到了 OutofMemory 错误,下面是我正在尝试执行的操作的简化示例

  public void start() throws RuntimeException {
        log.info("\t * Starting {} Managed Service...", getClass().getSimpleName());

        try {

            executeObserve();

            log.info("\t * Starting {} Managed Service...OK!", getClass().getSimpleName());
        } catch (Exception e) {
            log.info("Managed Service {} FAILED! Reason is {} ", getClass().getSimpleName(), e.getMessage(), e);
        }
    }

start 在初始化阶段被调用一次,executeObserve 如下(简化形式..)。请注意,在 onComplete 上我“重新提交”executeObserve

public void executeObserve() throws RuntimeException {

        Observable<Book> booksObserve
                = manager.getAsObservable();

        booksObserve
                 .map(Book::getAllOrders)
                 .flatMap(Observable::fromIterable)
                 .toList()
                 .subscribeOn(Schedulers.io())
                 .subscribe(collectedISBN ->  
                      Observable.fromIterable(collectedISBN)
                       .buffer(10)
                       // ...some more steps here...
                       .toList()
                       .toObservable()
                       // resubmit
                      .doOnComplete(this::executeObserve)
                      .subscribe(validISBN -> {
                             // do something with the valid ones
                      })
             )
        );
    }

我的猜测是,如果我想重新提交任务但找不到任何文档,这不是可行的方法。

booksObserve 的实现如下

public Observable<Book> getAsObservable() {
    return Observable.create(e -> {
        try (CloseableResultSet<Book> rs = (CloseableResultSet<Book>) datasource.retrieveAll())) {
            for (Book r : rs) {
                e.onNext(r);
            }
            e.onComplete();
        } catch (Exception ex) {
            e.onError(ex);
        }
    });
}

在我们调用 dispose 或等效方法之前不断重新提交操作的正确方法是什么?我正在使用 RxJava 2

最佳答案

您创建了一个无休止的递归,循环将创建越来越多的资源,有时会出现 OutOfMemory/Stack 溢出异常。

为了重复Observable您应该使用的工作 repeat()运算符,它将重新订阅 Observable当它收到onComplete()时。

除此之外,还有一些关于您的代码的一般性评论:

  • 为什么要嵌套第二个 Observable在订阅者内部?你正在打破链条,你可以继续链条而不是创建新的 Observable在订阅者处。
  • 而且,看起来(假设 O bservable.fromIterable(collectedBets) 使用 collectedISBNonNext() o.w. 从哪里来?)您将所有项目收集到一个列表中,然后将其展平再次使用 from iterable,所以看起来你可以继续流,类似这样:

    booksObserve
       .map(Book::getAllOrders)
       .flatMap(Observable::fromIterable)
       .buffer(10)
       // ...some more steps here...
       .toList()
       .toObservable()
       // resubmit
       .doOnComplete(this::executeObserve)
       .subscribeOn(Schedulers.io())
       .subscribe(validISBN -> {
             // do something with the valid ones
        });       
    
  • 无论如何,使用嵌套的 Observablerepeat()运算符将仅重复嵌套的流,而不是整个流(这就是您想要的),因为它未连接到它。

关于java - RxJava 2 Observable onComplete 重新提交自身,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42890312/

相关文章:

angular - 在同一可观察对象上多次使用 `async` 管道

java - 将 ListenableFuture 链转换为等效的 RxJava 结构

java - 将异步计算的数据传播回 GUI 线程

java - 在 RxJava/RxAndroid 中的可观察对象之间传递响应

java - 在 Java/Android 中从哈希表中的对象分配 boolean 值

java - 为什么我得到可以由jdk8、10、11、12构建的pom,但不能由jdk9构建?

java - 如何在JAVA中更新jpg、tiff文件的元数据值?

java - 使用MikePenz抽屉,在哪里可以处理onDrawerOpen、关闭软键盘

Angular 5 HTTP响应未定义

service - 在 Angular 2 中使用服务单例作为数据源的示例