java - 从项目 react 器中的通量中采样除第一个元素之外的所有元素

标签 java reactive-programming project-reactor

在项目 react 堆通量中有一个示例方法Flux#sample java doc 。它会更改通量,以便仅在指定周期结束时发出事件。

是否可以调整此行为并实现此目的:在第一个元素上 - 立即发出它,从第二个到最后开始延迟采样。基本上,我想从采样中排除第一个(且仅第一个)元素,以便无需初始等待即可发出它。

是否可以使用内置运算符来实现?如果没有那么有人知道如何解决这个问题吗?

这是我想要实现的一个最简单的示例:

Flux<String> inputFlux = Flux.just("first", "second", "third").delayElements(Duration.ofMillis(400));
Flux<String> transformed = /*do some magic with input flux*/;

StepVerifier.create(transformed)
    .expectNext("first")//first should always be emmited instantly
    //second arrives 400ms after first
    //third arrives 400ms after second
    .expectNoEvent(Duration.ofSeconds(1))
    .expectNext("third")//after sample period last received element should be received 
    .verifyComplete();

最佳答案

通过将源通量myFlux转换为热通量,您可以轻松实现这一点:

Flux<T> myFlux;
Flux<T> sharedFlux = myFlux.publish().refCount(2);

Flux<T> first = sharedFlux.take(1);
Flux<T> sampledRest = sharedFlux.skip(1).sample(Duration.ofMillis(whatever));

return Flux.merge(first, sampledRest);

关于java - 从项目 react 器中的通量中采样除第一个元素之外的所有元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60291038/

相关文章:

c# - 如何在设定时间后处理可观察对象?

spring-webflux - 正确登录响应式(Reactive)应用程序 - WebFlux

spring-boot - 哪个 CoroutineScope 用于 Spring Boot WebFlux 端点

spring - onErrorResume 和 doOnError 的区别

java - 如何在模型- View - Controller 中为模型设计接口(interface)?

java - 正确的属性方法

java - 使用 G1 GC 时如何知道何时调整堆大小?

java - 如何使用 Android Studio 删除部分 android 包名称?

javascript - 使用 RxJS 创建一个切换按钮

javascript - 如何确定 Observable.merge 中哪些 observable 发生了更改?