java - 在 Observable 中超时时发出项目

标签 java reactive-programming rx-java2

我有两个 PublishSubject 来模拟一个队列,我在其中接收作业并将作业推送到其中。

我希望能够对在给定时间窗口(例如 10 秒)内从第一个 PublishSubject 消耗但未放入第二个 PublishSubject 的作业使用react:

final Subject<Job> queue = PublishSubject.<Job>create().toSerialized();
final Subject<Job> done = PublishSubject.<Job>create().toSerialized();
// this is probably wrong already since I am consuming items from queue
queue.subscribe(done::onNext);

final Observable<Job> timeOut = queue.timeout(10, SECONDS, Observable.empty()); // ??

最佳答案

您还没有描述 10 秒后您真正想要做什么。 您想再次将该作业重新添加到队列中吗?如果处理时间超过 10 秒,您是否只想获取信息,或者如果处理时间太长,您是否想跳过该作业?

如果您想跳过处理该作业并使用一些新的可观察值继续 您可以flatMap队列的每个元素并分别超时它们。

Observable<Job> observableThatWillTriggerOnTimeout = ...
queue.flatMap(job -> dispatchJob(job)
                    .timeout(10, TimeUnit.SECONDS, observableThatWillTriggerOnTimeout)
             )

详细说明您的情况,如有必要,我会更新我的答案

关于java - 在 Observable 中超时时发出项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54166603/

相关文章:

java - Spring Boot不会在MySQL中创建表

java - RX java 将可观察量与一对多关系相结合

.net - Reactive Extensions 基于特定数量的并行处理

java - 相当于 RxJava 的 RxJS.pausable(Observable)

android - 如何在 Kotlin android 改造中使用 enque 方法

java - 拍照后显示 fragment

java - 将数据保存到文件的最佳方法是什么?

java - 我怎样才能让我的复选框自行选中

java - 如何将 Observable<Observable<List<T>>> 转换为 Observable<List<T>>

android - RxJava2 .subscribeOn .observeOn 困惑。在主线程上运行