我正在尝试在 Reactor 中的 Flux of Fluxes 上实现缓冲过程。内部通量上的每次发射均按某些属性进行分组,并在缓冲区持续时间到期后发射。
以下测试(简化以暴露问题)说明了所需的行为:
private static final long STEP_MILLIS = 50;
private static final Duration BUFFER_DURATION = Duration.ofMillis(STEP_MILLIS * 4);
@Test
public void testBuffer() throws Exception {
List<List<String>> buffers = new ArrayList<>();
UnicastProcessor<String> queue = UnicastProcessor.create();
FluxSink<String> sink = queue.sink();
Flux<Flux<String>> fluxOfFlux = queue.map(Flux::just);
// Buffering
fluxOfFlux.flatMap(Function.identity())
.transform(this::buffer)
.subscribe(buffers::add);
sink.next("TEST 1");
Thread.sleep(BUFFER_DURATION.toMillis() - STEP_MILLIS); // One "step" before Buffer should close
assertTrue(buffers.isEmpty());
sink.next("TEST 2");
assertTrue(buffers.isEmpty());
Thread.sleep(STEP_MILLIS * 2); // One "step" after Buffer should close
assertEquals(1, buffers.size());
assertEquals(Arrays.asList("TEST 1", "TEST 2"), buffers.get(0));
}
public Flux<List<String>> buffer(Flux<String> publisher) {
return publisher
.groupBy(Object::getClass)
.map(groupFlux -> groupFlux.take(BUFFER_DURATION).collectList())
.flatMap(Function.identity());
}
该测试按书面方式进行;当我尝试将“flatMap”和“transform”操作合并为一个操作时,问题就出现了:
// Buffering
fluxOfFlux.flatMap(flux -> flux.transform(this::buffer))
.subscribe(buffers::add);
然后测试失败,因为放入队列的每个项目都会立即作为单例“缓冲区”发出。
这是不可取的,因为在缓冲之前必须进行 flatMap 会消除内部 Flux 并行处理的能力。
这是用 Reactor 编写的,但应该使用 Observable 和类似名称的方法与 RxJava 进行一对一映射。
有人认为我可能做错了什么吗?
最佳答案
我仍在从 react 堆开始,但你的代码似乎不是没有副作用的。保持状态(在您的情况下使用 List<> 缓冲区)可能会在将来导致严重的错误。
您只需使用不可变即可实现相同的效果,如下所示
List<String> tests = Arrays.asList("TEST 1", "TEST 2", "TEST 3", "INTEGRATION 1", "INTEGRATION 2")
@Test
public void test() {
Flux<List<String>> bufferedFlux = Flux.fromIterable(tests).buffer(Duration.ofSeconds(5));
StepVerifier.withVirtualTime(() -> bufferedFlux)
.expectNext(tests)
.verifyComplete();
}
@Test
public void group() {
Flux<GroupedFlux<String, String>> flux = Flux
.fromIterable(tests)
.groupBy(test -> test.replaceAll("[^a-zA-Z]", ""));
StepVerifier.create(flux)
.expectNextMatches(groupedFlux -> groupedFlux.key().equals("TEST"))
.expectNextMatches(groupedFlux -> groupedFlux.key().equals("INTEGRATION"))
.verifyComplete();
Flux<String> sum = flux
.flatMap(groupedFlux -> groupedFlux
.map(test -> test.replaceAll("[^\\d.]", ""))
.map(Integer::valueOf)
.reduce((t1, t2) -> t1 + t2)
.map(total -> groupedFlux.key() + " TOTAL: " + total)
).sort();
StepVerifier.create(sum)
.expectNext("INTEGRATION TOTAL: 3")
.expectNext("TEST TOTAL: 6")
.verifyComplete();
}
关于java - 如何缓冲 GroupedFlux 的 Flux?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44636672/