在我的代码中,我有一些事件需要按顺序处理,而其他事件则需要并行处理。
我成功地完成了并行部分,但无法完成顺序版本。
event.ofType(Message.class)
.groupBy(Message::getGroup)
.flatMap(g -> {
if (g.getKey() == Message.GROUP_PARALLEL)
return g.flatMap(m -> Observable.just(m)
.observeOn(Schedulers.computation())
.doOnNext(this::send)
.delay(m.getRetryInterval(), TimeUnit.SECONDS)
.repeat(m.getRetryCount())
.takeUntil(event.ofType(Sent.class)
.filter(a -> a.getId() == m.getId())
)
.map(s -> m)
);
else if (g.getKey() == Message.GROUP_SEQUENTIAL) {
// ?
}
})
我想这样实现:
When an event is received
If the previous event is still being processed, wait before emission
Process the event (blocking)
所以:
event.subscribeOn(Schedulers.computation())
.waitUntilPreviousComplete()
.flatMap(this::processEvent) // we assume it take 1 seconds to complete
.susbcribe(this::processed, this::onError);
event.onNext(Event.A)
event.onNext(Event.B)
event.onNext(Event.C)
应该结果:
12h00:00 processed call with event A
12h00:01 processed call with event B
12h00:02 processed call with event C
所以我想要一些看起来像队列的东西。我用 HashMap 和主题尝试了一些奇怪的事情,但我确信有一种干净的方法可以做到这一点。我怎样才能实现它?
最佳答案
听起来您正在寻找 concatMap
。该运算符将按顺序订阅上游发出的项目,并且在上一个项目完成之前不会继续处理下一个项目。所以你的示例代码将如下所示:
event.subscribeOn(Schedulers.computation())
.concatMap(this::processEvent)
.subscribe(this::processed, this::onError);
关于android - 等待上一个事件完成 Rx,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72044850/