rx-java - 如何使用 RxJava 从另一个流实现过滤器

标签 rx-java rx-java2

所以我有两个像这样的 Steam :

--1--2--3--4--5--6-|
-----A-----B-------|

我的目标是拥有这样的流

--1----3------5--6-|

我尝试过使用 takeUntil 或skipUntil 等运算符,但我无法生成有效的东西。你能给我一些帮助吗?

谢谢

最佳答案

一般来说,2和A之间存在重合的问题,你必须定义一个窗口,在这个窗口中第二个流可以阻止第一个流的值被发射。例如,这将等待每个项目 1 毫秒 (2.x):

import java.util.concurrent.TimeUnit;

import io.reactivex.*;
import io.reactivex.schedulers.TestScheduler;

public class Coincidence {

    public static void main(String[] args) {
        TestScheduler sch = new TestScheduler();

        Flowable.interval(100, TimeUnit.MILLISECONDS, sch)
        .onBackpressureBuffer()
        .compose(coincide(Flowable.interval(300, TimeUnit.MILLISECONDS, sch), sch))
        .take(7)
        .subscribe(v -> 
            System.out.printf("%d - %d%n", sch.now(TimeUnit.MILLISECONDS), v));

        sch.advanceTimeBy(1001, TimeUnit.MILLISECONDS);
    }

    static <T, U> FlowableTransformer<T, T> coincide(
            Flowable<U> other, Scheduler scheduler) {
        return f -> {
            return other.publish(g -> {
                return f.flatMap(v -> {
                    return Flowable.just(v)
                            .delay(1, TimeUnit.MILLISECONDS, scheduler)
                            .takeUntil(g)
                    ;
                }, 1)
                .takeUntil(g.ignoreElements())
                ;
            });
        };
    };
}

关于rx-java - 如何使用 RxJava 从另一个流实现过滤器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43477014/

相关文章:

java - 如何让flatMap在后台线程执行

java - RxJava 将 Observable 与另一个可选的具有超时功能的 Observable 结合起来

retrofit2 - 在RXjava2中调用ObservableEmitter的onError时出现UndeliverableException

java - RxJava : behaviour of combineLatest

android - 将 Single<Boolean> 转换为 Boolean (Kotlin)

android - IllegalArgumentException : Could not locate call adapter for rx. 可观察的 RxJava,Retrofit2

java - 使用 RxJava (ReactiveX) 运行 Observable 需要多长时间?

android - 用于典型 REST 查询的 rxjava android 测试

java - react 流 : How to wait for all publishers, 按 key ?

android - DiffUtil 和 registerAdapterDataObserver