java - 在 Webclient Post 之前检测到空的 Flux 窗口

标签 java spring-webflux project-reactor reactive-streams

引用我之前的问题Splitting a WebClient Post of a Streaming Flux into JSON Arrays ,我正在使用;

myFlux
 .window(5)
 .flatMap(window -> client
  .post()
  .body(window, myClass.class)
  .exchange()
  .flatMap(response -> response.bodyToMono)
 )
 .subscribe();

这很好用。但是,在速度慢的一天,5 条消息需要一段时间才能到达,并且在 window 已满之前,window 不会发送任何内容。所以 我切换到 windowTimeout(5, Duration.ofSeconds(5))

现在,如果没有数据并且超过了 Duration,则代码将传播一个空的 window,这会导致发布一个空数组。

如何检测空窗口并且不运行post

最佳答案

不幸的是,如果不完整读取整个 Flux,就无法知道 Flux 将发出多少个项目。

由于您的窗口尺寸相对较小,您可以使用 .collectList() 将 Flux 发出的所有项目收集到 List 中,然后检查是否有在发送请求之前列表为空。

myFlux
    .windowTimeout(5, Duration.ofSeconds(5))
    .flatMap(window ->
        // collect everything in the window into a list
        window.collectList()
             // ignore empty windows
            .filter(list -> !list.isEmpty())
             // send the request
            .flatMap(list -> client
                .post()
                .body(Flux.fromIterable(list), MyClass.class)
                .exchange()
                .flatMap(response -> response.bodyToMono(MyResponse.class))))

关于java - 在 Webclient Post 之前检测到空的 Flux 窗口,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56437344/

相关文章:

java - 如何将 Flux<List<T>> 扁平化为 Flux<T>?

Java、maven、hibernate - 映射问题

java - 使用Java在linux系统上获取默认终端应用程序

java - 如何在 Spring Webflux 中获取引荐来源网址?

spring-webflux - 为什么 Mono 不能持有 NULL

kotlin - Reactor - 如果没有通过过滤器,如何使用 filterWhen 并仍然传递用于记录目的的值?

spring-boot - 响应式(Reactive) Elasticsearch 和分页

java - 如何使用 PDFBox 将 XFDF 注释导入 PDF

java - 使用 spark-submit 和 java -cp 运行 spark 应用程序时有什么区别?

java - Flux 中 Mono 的 doOnSuccess 方法相当于什么?