我想知道是否有一种方法可以组合现有的运算符来执行与 switchMap()
相反的操作.switchMap()
将追踪它收到的最新发射并取消任何 Observable
它以前正在执行。假设我翻转了它,我想忽略所有进入 xxxMap()
的排放。当它忙于接收到的第一次发射时。它将继续忽略排放,直到它完成排放当前 Observable
在它里面。然后它将处理它收到的下一个发射。
Observable.interval(1, TimeUnit.SECONDS)
.doOnNext(i -> System.out.println("Source Emitted Value: " + i))
.ignoreWhileBusyMap(i -> doIntensiveProcess(i).subcribeOn(Schedulers.computation()))
.subscribe(i -> System.out.println("Subscriber received Value: " + i));
有没有办法做到这一点?在上面的例子中,如果
intensiveProcess()
持续三秒,ignoreWhileBusyMap()
将处理 0
但可能会忽略排放 1
和 2
来自 interval()
。然后它会处理 3
但可能会忽略 4
和 5
, 等等...
最佳答案
当然,通过在处理完成后设置的 bool 值对值的处理进行门控:
AtomicBoolean gate = new AtomicBoolean(true);
Observable.interval(200, TimeUnit.MILLISECONDS)
.flatMap(v -> {
if (gate.get()) {
gate.set(false);
return Observable.just(v).delay(500, TimeUnit.MILLISECONDS)
.doAfterTerminate(() -> gate.set(true));
} else {
return Observable.empty();
}
})
.take(10)
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace);
编辑
选择:
Observable.interval(200, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.flatMap(v -> {
return Observable.just(v).delay(500, TimeUnit.MILLISECONDS);
}, 1)
.take(10)
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace);
您可以更改
onBackpressureDrop
至 onBackpressureLatest
立即继续使用最新值。
关于rx-java - RxJava - 与 switchMap() 运算符相反?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37171525/