java - 在 Spring Boot 客户端中接收 Flux

标签 java mongodb spring-boot project-reactor spring-webflux

这是 Spring 5 Web Reactive - How can we use WebClient to retrieve streamed data in a Flux? 的后续问题

我尝试遵循如何使用 Spring WebClient 接收 Flux 的建议,但实际上遇到了网络问题。

在服务器端,代码是一个简单的 Controller ,公开 Mongo 存储库的 findAll 方法:

public Flux<Power> getAll() {
    return powerRepository.findAll();


public CommandLineRunner readFromApiServer() {
    return new CommandLineRunner() {
        public void run(String... args) throws Exception {
            WebClient webClient = WebClient.create("http://localhost:8080");
            Flux<Power> flux = webClient.get().uri("/power").accept(MediaType.APPLICATION_STREAM_JSON).exchange()
                    .flatMap(response -> response.bodyToFlux(Power.class));


2017-02-27 08:19:41.281 ERROR 99026 --- [ctor-http-nio-5] : [HttpClient] Error processing connection. Requesting close the channel

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1 at io.netty.buffer.AbstractReferenceCountedByteBuf.release0( ~[netty-all-4.1.8.Final.jar:4.1.8.Final]

我正在使用当前的 Spring Boot 2.0.0 BUILD-SNAPSHOT。



所有 CommandLineRunner bean 都会在 Spring Boot 应用程序启动时执行。如果没有守护线程(即当前应用程序不是 Web 应用程序),则应用程序将在所有运行程序执行完毕后关闭。


此外,您没有对 HTTP 请求的结果执行任何操作。 我认为使用以下内容更新您的代码示例应该可以解决问题:

public CommandLineRunner readFromApiServer() {
    return new CommandLineRunner() {
        public void run(String... args) throws Exception {
            WebClient webClient = WebClient.create("http://localhost:8080");
            Flux<Power> flux = webClient.get().uri("/power").accept(MediaType.APPLICATION_STREAM_JSON).exchange()
                    .flatMap(response -> response.bodyToFlux(Power.class));
            List<Power> powerList = flux.collectList().block();
            // do something with the result?

关于java - 在 Spring Boot 客户端中接收 Flux,我们在Stack Overflow上找到一个类似的问题:


caching - Spring Boot中如何防止缓存

java - 为什么方法声明中的顺序很重要?

java - 当有模型列表而不仅仅是 Java 中的模型时如何获取类

Java拉力赛休息API : How to add new user to the Project

javascript - 如何使用express.js和mongoclient将数组插入mongodb

java - 从Migrate迁移到Spring MVC 4 + Hibernate5

java - 所有AsyncTasks完成后如何执行onMapReady()回调?

mongodb - Spring数据MongoDb : MappingMongoConverter remove _class

python - 循环 Pymongo 游标在一些迭代后返回 bson.errors.InvalidBSON 错误

java - Spring-Boot/AMQP - 限制处理的消息数量