java - 服务器上生成的带有 MediaType.TEXT_EVENT_STREAM 的事件何时会传递给客户端上的订阅者

标签 java spring spring-webflux reactor-netty

我创建了一个示例客户端/服务器应用程序来熟悉 Spring Webflux/Reactor Netty。现在,当响应包含 Flux 并且媒体类型为“文本/事件流”时,我对客户端的行为有点困惑。我可以看到服务器上生成的每个元素都会立即发送到客户端,但尚未传递给订阅者。第一次交付给订阅者发生在服务器端的生产者完成 Flux 之后。 对我来说,这意味着所有元素首先会在客户端的reactor-netty 中的某个位置收集,直到获得完整/错误事件。

我的结论是正确的还是我可能做错了什么? 如果属实,这种情况在不久的将来会改变吗?根据我目前观察到的行为,使用 Spring Webflux 的大部分好处都被否定了,因为与 Spring Mvc 一样,消费者必须等到整个元素集合创建并传输后才能开始处理元素。

我的服务器应用程序是:`

@SpringBootApplication
public class ServerApp {
    public static void main(String[] args) {
        new SpringApplicationBuilder().sources(ServerApp.class).run(args);
    }

    @RestController
    public static class TestController {
        @GetMapping(value = "/test", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<String> testFlux() {
            class AsyncSink implements Consumer<SynchronousSink<String>> {
                private List<String> allStrings = List.of(
                        "Hello Flux1!",
                        "Hello Flux2!",
                        "Hello Flux3!",
                        "Hello Flux4!",
                        "Hello Flux5!");
                private int index = 0;

                @Override
                public void accept(SynchronousSink<String> sink) {
                    if (index == allStrings.size()) {
                        sink.complete();
                    }
                    else {
                        sink.next(allStrings.get(index++));
                    }
                }

            }

            return Flux.generate(new AsyncSink());
        }
    }
}

我的客户端应用程序是:

@SpringBootApplication
public class ClientApp {
    public static void main(String[] args) throws IOException {
        ConfigurableApplicationContext aContext = new SpringApplicationBuilder().web(WebApplicationType.NONE).sources(ClientApp.class).run(args);

        Flux<String> aTestFlux = aContext.getBean(TestProxy.class).getFlux();
        aTestFlux.subscribe(new TestSubscriber());

        System.out.println("Press ENTER to exit.");
        System.in.read();
    }

    @Bean
    public WebClient webClient() {
        return WebClient.builder().baseUrl("http://localhost:8080").build();
    }

    @Component
    public static class TestProxy {
        @Autowired
        private WebClient webClient;

        public Flux<String> getFlux() {
            return webClient.get().uri("/test").accept(MediaType.TEXT_EVENT_STREAM).exchange().flatMapMany(theResponse -> theResponse.bodyToFlux(String.class));
        }
    }

    private static class TestSubscriber extends BaseSubscriber<String> {
        @Override
        public void hookOnSubscribe(Subscription subscription) {
            System.out.println("Subscribed");
            request(Long.MAX_VALUE);
        }

        @Override
        public void hookOnNext(String theValue) {
            System.out.println(" - " + theValue);
            request(1);
        }

        @Override
        protected void hookOnComplete() {
            System.out.println("   done");
        }

        @Override
        protected void hookOnCancel() {
            System.out.println("   cancelled");
        }

        @Override
        protected void hookOnError(Throwable theThrowable) {
            theThrowable.printStackTrace(System.err);
        }
    }
}

当我访问网址http://localhost:8080/test时使用 Chrome 浏览器我看到:

data:Hello Flux1!

data:Hello Flux2!

data:Hello Flux3!

data:Hello Flux4!

data:Hello Flux5!

对我来说,看起来已经发送了 5 个 http 事件。

最佳答案

取自 react 性文档并重写以满足您的需求。

我的猜测是,在您的示例中,您已经向生成函数传递了一个消费者,该消费者在完成后将被发出。

改为使用方法 Flux#generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator)您提供一个状态,其中将包含您想要发出的项目,然后在提供的 BiFunction 中您逐一发出每一项。

Flux<String> flux = Flux.generate(
    () -> List.of("1!", "2!", "3!", "4!", "5!"), 
    (state, sink) -> {
        if (index == allStrings.size()) {
            sink.complete();
        } else {
          sink.next(state.get(index++));
        } 
    });

我尚未测试在移动设备上编写的代码。

关于java - 服务器上生成的带有 MediaType.TEXT_EVENT_STREAM 的事件何时会传递给客户端上的订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57004002/

相关文章:

java - 将变量传递给java进程

java - Spring批处理远程分块和远程分区之间的区别

java - 在 flatMap 上使用reduce时,Reactor Flux订阅者流停止

java - Spring 网络流量 : how to configure Controller and WebClient to work like proxy?

java - 如何在Spring boot中修改Mono对象的属性而不阻塞它

java - xdoclet 与 xdoclet2?

java - 是否有任何 java 集合包装器/集合在多线程访问时失败?

java - 返回要使用 AudioTrack 播放的轨道的长度(以秒为单位)

java - 无法在本地 tomcat(spring boot 嵌入式)服务器上运行 Spring Boot 1.5.10.RELEASE

java - Spring安全吃Angularjs POST请求