java - 与 Flux 相交操作 - Project Reactor

标签 java reactive-programming spring-webflux project-reactor

假设我有两个 var a = Flux.just("A", "B", "C")var b = Flux.just("B", "C", "D")
我希望能够将两个变量相交,结果应该相当于一个集合相交

类似 a.intersect(b)Flux.intersect(a, b)这将导致(通量)["B", "C"]
我找不到任何可以执行此操作的操作,有什么想法吗?

最佳答案

你可以像这样使用 join、filter、map 和 groupBy

    //Join fluxes in tuple
    a.join(b,s -> Flux.never(), s-> Flux.never(),Tuples::of)
            //Filter out matching
            .filter(t -> t.getT1().equals(t.getT2()))
            //Revert to single value
            .map(Tuple2::getT1)
            //Remove duplicates
            .groupBy(f -> f)
            .map(GroupedFlux::key)
            .subscribe(System.out::println);

结果是对每个订阅者进行单一订阅,并且还可以与欺骗者一起使用。

或者你可以编写自己的 intersect 方法
public <T> Flux<T> intersect(Flux<T> f1,Flux<T> f2){
    return f1.join(f2,f ->Flux.never(),f-> Flux.never(),Tuples::of)
            .filter(t -> t.getT1().equals(t.getT2()))
            .map(Tuple2::getT1)
            .groupBy(f -> f)
            .map(GroupedFlux::key);
}

//Use on it's own
intersect(a,b).subscribe(System.out::println)

//Or with existing flux
a.transform(f -> intersect(a,f)).subscribe(System.out::println)

关于java - 与 Flux 相交操作 - Project Reactor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62049249/

相关文章:

java - 将多个 Mono<List<Item>> 合并为一个

RxJS:我如何 "manually"更新 Observable?

RxJS groupBy 转多维数组

spring-webflux - Netty Http 客户端连接池

java - Google Map 在 ViewPager 的 fragment 内,将所有触摸事件保留在 Google Map 中

java - Android-Studio : Error: Could not create the Java Virtual Machine, 我无法测试我的应用程序

java - SLF4J 记录器不执行 {} 替换

java - for 循环类分配出错

spring - 如何使用 WebClient 压缩 JSON 请求正文?

java - 在 RestController 方法中获取 ServerWebExchange