android - 等待上一个事件完成 Rx

标签 android queue rx-java buffer reactive-programming

在我的代码中,我有一些事件需要按顺序处理,而其他事件则需要并行处理。

我成功地完成了并行部分,但无法完成顺序版本。

event.ofType(Message.class)
    .groupBy(Message::getGroup)
    .flatMap(g -> {
        if (g.getKey() == Message.GROUP_PARALLEL)
            return g.flatMap(m -> Observable.just(m)
                        .observeOn(Schedulers.computation())
                        .doOnNext(this::send)
                        .delay(m.getRetryInterval(), TimeUnit.SECONDS)
                        .repeat(m.getRetryCount())
                        .takeUntil(event.ofType(Sent.class)
                            .filter(a -> a.getId() == m.getId())
                        )
                        .map(s -> m)
                    );
        else if (g.getKey() == Message.GROUP_SEQUENTIAL) {
            // ?
        }
    })

我想这样实现:

When an event is received
    If the previous event is still being processed, wait before emission
    Process the event (blocking)

所以:

    event.subscribeOn(Schedulers.computation())
        .waitUntilPreviousComplete()
        .flatMap(this::processEvent) // we assume it take 1 seconds to complete
        .susbcribe(this::processed, this::onError);

    event.onNext(Event.A)
    event.onNext(Event.B)
    event.onNext(Event.C)

应该结果:

    12h00:00 processed call with event A
    12h00:01 processed call with event B
    12h00:02 processed call with event C

所以我想要一些看起来像队列的东西。我用 HashMap 和主题尝试了一些奇怪的事情,但我确信有一种干净的方法可以做到这一点。我怎样才能实现它?

最佳答案

听起来您正在寻找 concatMap 。该运算符将按顺序订阅上游发出的项目,并且在上一个项目完成之前不会继续处理下一个项目。所以你的示例代码将如下所示:

event.subscribeOn(Schedulers.computation())
    .concatMap(this::processEvent)
    .subscribe(this::processed, this::onError);

关于android - 等待上一个事件完成 Rx,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72044850/

相关文章:

c++ - 试图理解 boost 队列示例

kotlin - 使用rxAndroid时如何获得改造响应?

安卓:开火xmpp

android - 为什么我的加载器被破坏了?

IOS 上的 Android post 请求

php - PHP/MySQL 中的任务队列 : Race Condition

php - file_get_contents 不起作用?

android - 在 RxJava2(Android) 中订阅与订阅?

android - Rxjava 和 Volley 请求

android - LogCat 中的 NullPointerException?