我正在使用第三方 REST Controller ,它接受 JSON 对象数组并返回单个对象响应。当我使用有限的 Flux
从 WebClient 进行 POST 时,代码可以正常工作(我假设是因为 Flux
已完成)。
但是,当Flux
可能是无限的时,我该怎么办;
- 以数组 block 形式发布?
- 捕获每个 POSTed 数组的响应?
- 停止
Flux
的传输?
这是我的 bean ;
public class Car implements Serializable {
Long id;
public Car() {}
public Car(Long id) { this.id = id; }
public Long getId() {return id; }
public void setId(Long id) { this.id = id; }
}
这就是我假设第三方客户端的样子;
@RestController
public class ThirdPartyServer {
@PostMapping("/cars")
public CarResponse doCars(@RequestBody List<Car> cars) {
System.err.println("Got " + cars);
return new CarResponse("OK");
}
}
这是我的代码。当我 POST flux2
时,完成后会发送一个 JSON 数组。但是,当我 POST flux1
时,在第一个 take(5)
之后没有发送任何内容。如何 POST 接下来的 5 个 block ?
@Component
public class MyCarClient {
public void sendCars() {
// Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));
Flux<Car> flux2 = Flux.range(1, 10).map(i -> new Car((long) i));
WebClient client = WebClient.create("http://localhost:8080");
client
.post()
.uri("/cars")
.contentType(MediaType.APPLICATION_JSON)
.body(flux2, Car.class)
// .body(flux1.take(5).collectList(), new ParameterizedTypeReference<List<Car>>() {})
.exchange()
.subscribe(r -> System.err.println(r.statusCode()));
}
}
最佳答案
- How do I POST in chunks of arrays?
使用 Flux.window
的变体之一将主通量拆分为窗口通量,然后通过 .flatMap
Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));
WebClient client = WebClient.create("http://localhost:8080");
Disposable disposable = flux1
// 1
.window(5)
.flatMap(windowedFlux -> client
.post()
.uri("/cars")
.contentType(MediaType.APPLICATION_JSON)
.body(windowedFlux, Car.class)
.exchange()
// 2
.doOnNext(response -> System.out.println(response.statusCode()))
.flatMap(response -> response.bodyToMono(...)))
.subscribe();
Thread.sleep(10000);
// 3
disposable.dispose();
- How do I capture the response, per POSTed array?
您可以在.exchange()
之后通过运算符分析响应。
在我提供的示例中,可以在 doOnNext
运算符中看到响应,但您可以使用对 onNext
信号进行操作的任何运算符,例如 map
或句柄
。
请务必完整阅读响应正文,以确保连接返回到池中(请参阅 note )。在这里,我使用了 .bodyToMono
,但任何 .body
或 .toEntity
方法都可以工作。
- Stop the transmission of the Flux?
当使用 subscribe
方法时,您可以使用返回的 disposable.dispose()
停止流程。
或者,您可以从 sendCars()
方法返回 Flux,并将订阅和处置委托(delegate)给调用者。
请注意,在我提供的示例中,我只是使用 Thread.sleep() 来模拟等待。在实际应用程序中,您应该使用更高级的东西,并避免 Thread.sleep()
关于java - 将 Streaming Flux 的 WebClient Post 拆分为 JSON 数组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55523944/