我有两个 PublishSubject
来模拟一个队列,我在其中接收作业并将作业推送到其中。
我希望能够对在给定时间窗口(例如 10 秒)内从第一个 PublishSubject
消耗但未放入第二个 PublishSubject
的作业使用react:
final Subject<Job> queue = PublishSubject.<Job>create().toSerialized();
final Subject<Job> done = PublishSubject.<Job>create().toSerialized();
// this is probably wrong already since I am consuming items from queue
queue.subscribe(done::onNext);
final Observable<Job> timeOut = queue.timeout(10, SECONDS, Observable.empty()); // ??
最佳答案
您还没有描述 10 秒后您真正想要做什么。 您想再次将该作业重新添加到队列中吗?如果处理时间超过 10 秒,您是否只想获取信息,或者如果处理时间太长,您是否想跳过该作业?
如果您想跳过处理该作业并使用一些新的可观察值继续
您可以flatMap
队列的每个元素并分别超时
它们。
Observable<Job> observableThatWillTriggerOnTimeout = ...
queue.flatMap(job -> dispatchJob(job)
.timeout(10, TimeUnit.SECONDS, observableThatWillTriggerOnTimeout)
)
详细说明您的情况,如有必要,我会更新我的答案
关于java - 在 Observable 中超时时发出项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54166603/