java - 发射后取消订阅间隔

标签 java rx-java rx-java2

我有一个代码,我在其中设置一个间隔,直到条件完成,然后在订阅中发回结果。

但由于是一个间隔,因此订阅会继续。

我想知道是否有任何方法可以在发出某些内容后取消订阅 Observable 间隔

这里是代码

Subscription subscriber = Observable.interval(0, 5, TimeUnit.MILLISECONDS)
                .map(i -> eventHandler.getProcessedEvents())
                .filter(eventsProcessed -> eventsProcessed >= 10)
                .doOnNext(eventsProcessed -> eventHandler.initProcessedEvents())
                .doOnNext(eventsProcessed -> logger.info(null, "Total number of events processed:" + eventsProcessed))
                .subscribe(t -> resumeRequest(asyncResponse));
        new TestSubscriber((Observer) subscriber).awaitTerminalEvent(10, TimeUnit.SECONDS);
subscriber.unsubscribe();

目前,作为黑客,我使用计时器,然后取消订阅,但这很糟糕!

问候

最佳答案

您可以使用first运算符

Subscription subscriber = Observable.interval(0, 5, TimeUnit.MILLISECONDS)
                .map(i -> eventHandler.getProcessedEvents())
                .first(eventsProcessed -> eventsProcessed >= 10)
                .doOnNext(eventsProcessed -> eventHandler.initProcessedEvents())
                .doOnNext(eventsProcessed -> logger.info(null, "Total number of events processed:" + eventsProcessed))
                .subscribe(t -> resumeRequest(asyncResponse));

而不是过滤器。这可确保您在满足条件时仅获得一次发射。请注意,如果您的条件间隔 Observable 在没有满足您的条件的情况下终止,您将收到异常。

关于java - 发射后取消订阅间隔,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44843670/

相关文章:

java - 如何在 Eclipse 中使用单个命令运行多个 Java 项目?

android - 为什么我的 RxJava Observable 没有完成?

android - 如何订阅点击事件以便异常情况不会取消订阅?

java - 在 RxJava 中,RxJavaPlugins.setErrorHandler 和 Subscribe onError 有什么区别?

rx-java2 - 具有不同调度程序的 flatMap 的行为

java - 如何将所有 OPTION 请求映射到游戏 2 中的某个 Controller ?

java - 在文本区域中显示图像

java - 如何对不同类类型的对象数组进行排序?

android - 我们应该为 Rxjava3 使用哪个 rxjava3 改造适配器

android - 如何返回错误值并使用计时器重试