rx-java - RxJava - 与 switchMap() 运算符相反?

标签 rx-java reactive-programming

我想知道是否有一种方法可以组合现有的运算符来执行与 switchMap() 相反的操作.
switchMap()将追踪它收到的最新发射并取消任何 Observable它以前正在执行。假设我翻转了它,我想忽略所有进入 xxxMap() 的排放。当它忙于接收到的第一次发射时。它将继续忽略排放,直到它完成排放当前 Observable在它里面。然后它将处理它收到的下一个发射。

Observable.interval(1, TimeUnit.SECONDS)
        .doOnNext(i -> System.out.println("Source Emitted Value: " + i))
        .ignoreWhileBusyMap(i -> doIntensiveProcess(i).subcribeOn(Schedulers.computation())) 
        .subscribe(i -> System.out.println("Subscriber received Value: " + i));

有没有办法做到这一点?在上面的例子中,如果 intensiveProcess()持续三秒,ignoreWhileBusyMap()将处理 0但可能会忽略排放 12来自 interval() 。然后它会处理 3但可能会忽略 45 , 等等...

最佳答案

当然,通过在处理完成后设置的 bool 值对值的处理进行门控:

AtomicBoolean gate = new AtomicBoolean(true);

Observable.interval(200, TimeUnit.MILLISECONDS)
.flatMap(v -> {
    if (gate.get()) {
        gate.set(false);

        return Observable.just(v).delay(500, TimeUnit.MILLISECONDS)
                .doAfterTerminate(() -> gate.set(true));
    } else {
        return Observable.empty();
    }
})
.take(10)
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace);

编辑

选择:
Observable.interval(200, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.flatMap(v -> {
    return Observable.just(v).delay(500, TimeUnit.MILLISECONDS);
}, 1)
.take(10)
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace);

您可以更改 onBackpressureDroponBackpressureLatest立即继续使用最新值。

关于rx-java - RxJava - 与 switchMap() 运算符相反?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37171525/

相关文章:

java - 将 Akka 源代码转换为 RxJava2 Flowable?

android - 当 EditText 有文本时启用按钮 (RxAndroid)

javascript - 当一个流依赖于另一个流的值时该怎么办?

java - Concat 不同类型的 Observable

java - onError java.lang.NullPointerException : Attempt to invoke virtual method 'double java.lang.Double.doubleValue()' on a null object reference 错误

java - RxJava : Observable and default thread

java - 在 RxJava 中什么算作异步边界?

swift - RxSwift flatMap 最新一次无需处理

swift - RxSwift 在没有主题或变量的情况下保持字符串的状态

安卓 RxJava AlertDialog