java - 如何缓冲 GroupedFlux 的 Flux?

标签 java rx-java reactive-programming rx-java2 project-reactor

我正在尝试在 Reactor 中的 Flux of Fluxes 上实现缓冲过程。内部通量上的每次发射均按某些属性进行分组,并在缓冲区持续时间到期后发射。

以下测试(简化以暴露问题)说明了所需的行为:

private static final long STEP_MILLIS = 50;

private static final Duration BUFFER_DURATION = Duration.ofMillis(STEP_MILLIS * 4);

@Test
public void testBuffer() throws Exception {
    List<List<String>> buffers = new ArrayList<>();

    UnicastProcessor<String> queue = UnicastProcessor.create();
    FluxSink<String> sink = queue.sink();

    Flux<Flux<String>> fluxOfFlux = queue.map(Flux::just);

    // Buffering
    fluxOfFlux.flatMap(Function.identity())
        .transform(this::buffer)
        .subscribe(buffers::add);

    sink.next("TEST 1");

    Thread.sleep(BUFFER_DURATION.toMillis() - STEP_MILLIS);  // One "step" before Buffer should close

    assertTrue(buffers.isEmpty());

    sink.next("TEST 2");

    assertTrue(buffers.isEmpty());

    Thread.sleep(STEP_MILLIS * 2); // One "step" after Buffer should close

    assertEquals(1, buffers.size());
    assertEquals(Arrays.asList("TEST 1", "TEST 2"), buffers.get(0));
}

public Flux<List<String>> buffer(Flux<String> publisher) {
    return publisher
        .groupBy(Object::getClass)
        .map(groupFlux -> groupFlux.take(BUFFER_DURATION).collectList())
        .flatMap(Function.identity());
}

该测试按书面方式进行;当我尝试将“flatMap”和“transform”操作合并为一个操作时,问题就出现了:

    // Buffering
    fluxOfFlux.flatMap(flux -> flux.transform(this::buffer))
        .subscribe(buffers::add);

然后测试失败,因为放入队列的每个项目都会立即作为单例“缓冲区”发出。

这是不可取的,因为在缓冲之前必须进行 flatMap 会消除内部 Flux 并行处理的能力。

这是用 Reactor 编写的,但应该使用 Observable 和类似名称的方法与 RxJava 进行一对一映射。

有人认为我可能做错了什么吗?

最佳答案

我仍在从 react 堆开始,但你的代码似乎不是没有副作用的。保持状态(在您的情况下使用 List<> 缓冲区)可能会在将来导致严重的错误。

您只需使用不可变即可实现相同的效果,如下所示

List<String> tests = Arrays.asList("TEST 1", "TEST 2", "TEST 3", "INTEGRATION 1", "INTEGRATION 2")

@Test
public void test() {
    Flux<List<String>> bufferedFlux = Flux.fromIterable(tests).buffer(Duration.ofSeconds(5));

    StepVerifier.withVirtualTime(() -> bufferedFlux)
            .expectNext(tests)
            .verifyComplete();
}

@Test
public void group() {
    Flux<GroupedFlux<String, String>> flux = Flux
            .fromIterable(tests)
            .groupBy(test -> test.replaceAll("[^a-zA-Z]", ""));

    StepVerifier.create(flux)
            .expectNextMatches(groupedFlux -> groupedFlux.key().equals("TEST"))
            .expectNextMatches(groupedFlux -> groupedFlux.key().equals("INTEGRATION"))
            .verifyComplete();


    Flux<String> sum = flux
            .flatMap(groupedFlux -> groupedFlux
                    .map(test -> test.replaceAll("[^\\d.]", ""))
                    .map(Integer::valueOf)
                    .reduce((t1, t2) -> t1 + t2)
                    .map(total -> groupedFlux.key() + " TOTAL: " + total)
            ).sort();

    StepVerifier.create(sum)
            .expectNext("INTEGRATION TOTAL: 3")
            .expectNext("TEST TOTAL: 6")
            .verifyComplete();
}

关于java - 如何缓冲 GroupedFlux 的 Flux?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44636672/

相关文章:

Java JFrame ComboBox情况

java - 免费添加 JTextFields 硬代码

rx-java - RxJava 的后备 Observable

android - 使用 RxJava 和 Android 的多个 API 调用

java - try-catch-finally 后返回

rx-java - RXJava/Kotlin - 链接单个结果为一个

scala - ReactiveMongo 是如何实现的,因此它被认为是非阻塞的?

java - RxJava 适合分支工作流程吗?

java - 将多个异步 Observable<List> 减少为一个 Observable<List>

java - 在32位浏览器中检测64位jre?