java - 将 Streaming Flux 的 WebClient Post 拆分为 JSON 数组

标签 java spring-webflux project-reactor reactive-streams

我正在使用第三方 REST Controller ,它接受 JSON 对象数组并返回单个对象响应。当我使用有限的 Flux 从 WebClient 进行 POST 时,代码可以正常工作(我假设是因为 Flux 已完成)。

但是,当Flux可能是无限的时,我该怎么办;

  1. 以数组 block 形式发布?
  2. 捕获每个 POSTed 数组的响应?
  3. 停止Flux的传输?

这是我的 bean ;

public class Car implements Serializable {

    Long id;

    public Car() {}
    public Car(Long id) { this.id = id; }
    public Long getId() {return id; }
    public void setId(Long id) { this.id = id; }
}

这就是我假设第三方客户端的样子;

@RestController
public class ThirdPartyServer {

    @PostMapping("/cars")
    public CarResponse doCars(@RequestBody List<Car> cars) {
        System.err.println("Got " + cars);
        return new CarResponse("OK");
    }
}

这是我的代码。当我 POST flux2 时,完成后会发送一个 JSON 数组。但是,当我 POST flux1 时,在第一个 take(5) 之后没有发送任何内容。如何 POST 接下来的 5 个 block ?

@Component
public class MyCarClient {

    public void sendCars() {

//      Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));
        Flux<Car> flux2 = Flux.range(1, 10).map(i -> new Car((long) i));

        WebClient client = WebClient.create("http://localhost:8080");
        client
            .post()
            .uri("/cars")
            .contentType(MediaType.APPLICATION_JSON)
            .body(flux2, Car.class) 
//          .body(flux1.take(5).collectList(), new ParameterizedTypeReference<List<Car>>() {})
            .exchange()
            .subscribe(r -> System.err.println(r.statusCode()));
    }
}

最佳答案

  1. How do I POST in chunks of arrays?

使用 Flux.window 的变体之一将主通量拆分为窗口通量,然后通过 .flatMap

使用窗口通量发送请求
        Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));

        WebClient client = WebClient.create("http://localhost:8080");
        Disposable disposable = flux1
                // 1
                .window(5)
                .flatMap(windowedFlux -> client
                        .post()
                        .uri("/cars")
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(windowedFlux, Car.class)
                        .exchange()
                        // 2
                        .doOnNext(response -> System.out.println(response.statusCode()))
                        .flatMap(response -> response.bodyToMono(...)))
                .subscribe();

        Thread.sleep(10000);

        // 3
        disposable.dispose();

  1. How do I capture the response, per POSTed array?

您可以在.exchange()之后通过运算符分析响应。

在我提供的示例中,可以在 doOnNext 运算符中看到响应,但您可以使用对 onNext 信号进行操作的任何运算符,例如 map 句柄

请务必完整阅读响应正文,以确保连接返回到池中(请参阅 note )。在这里,我使用了 .bodyToMono,但任何 .body.toEntity 方法都可以工作。

  1. Stop the transmission of the Flux?

当使用 subscribe 方法时,您可以使用返回的 disposable.dispose() 停止流程。

或者,您可以从 sendCars() 方法返回 Flux,并将订阅和处置委托(delegate)给调用者。

请注意,在我提供的示例中,我只是使用 Thread.sleep() 来模拟等待。在实际应用程序中,您应该使用更高级的东西,并避免 Thread.sleep()

关于java - 将 Streaming Flux 的 WebClient Post 拆分为 JSON 数组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55523944/

相关文章:

java - 防止连续出现相同的随机输出

带有 Springs ResponseEntity 的 Kotlin 和通用返回类型

java - FlatMap a Flux 未执行

netty - 为 Netty 服务器上的 Spring Reactive Web 应用程序定义最大并发用户数

java - 身份验证期间抛出并处理自定义异常 Spring Security + WebFlux

java - 如何正确管理 Reactor 中的可关闭资源

java - 如何使用 CSS 在 JavaFX2 中设置表格单元格的样式(不删除悬停/选择等)

java - 从 JSON 数组字符串创建 POJO 对象

java - Subject.toString() 返回主体列表但不返回经过身份验证的用户名

spring-webflux - WebClient 检索 NULL 作为响应主体