java - 如何提出真正的通量请求?

标签 java spring-boot spring-webflux

我开始从 Spring-boot 学习 Webflux。我了解到,对于 RestController 的端点,您可以定义一个 Flux 请求主体,我希望在其中有一个真正的通量流,也就是说,整个请求的各个部分一个接一个地出现,并且这些部分也可以一个接一个地处理.但是,在使用客户端和服务器构建一个小示例后,我无法使其按预期工作。

这是服务器的片段:

@PostMapping("/digest")
    public Flux<String> digest(@RequestBody Flux<String> text) {
        continuousMD5.reset();
        return text.log("server.request.").map(piece -> continuousMD5.update(piece)).log("server.response.");
    }

注意:每段文本都会被发送到一个连续的MD5对象,该对象会累加所有的片段,并在每次累加后计算并返回中间MD5哈希值。流将在 MD5 计算之前和之后被记录。

这是客户端的片段:

@PostConstruct
    private void init() {
        webClient = webClientBuilder.baseUrl(reactiveServerUrl).build();
    }

@PostMapping(value = "/send", consumes = MediaType.TEXT_PLAIN_VALUE)
    public Flux<String> send(@RequestBody Flux<String> text) {
        return webClient.post()
            .uri("/digest")
            .accept(MediaType.TEXT_PLAIN)
            .body(text.log("client.request."), String.class)
            .retrieve().bodyToFlux(String.class).log("client.response.");
    }

注意:客户端接受一些文本的流量流并记录该流并将其发送到服务器(作为流量流)。

令人惊讶的是,我发送 REST 请求并让客户端通过以下命令行接收通量流:

for i in $(seq 1 100); do echo "The message $i"; done | http POST :8080/send  Content-Type:text/plain

我可以在客户端的日志中看到:

2019-05-09 17:02:08.604  INFO 3462 --- [ctor-http-nio-2] client.response.Flux.MonoFlatMapMany.2   : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
2019-05-09 17:02:08.606  INFO 3462 --- [ctor-http-nio-2] client.response.Flux.MonoFlatMapMany.2   : request(1)
2019-05-09 17:02:08.649  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
2019-05-09 17:02:08.650  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(32)
2019-05-09 17:02:08.674  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : onNext(The message 1)
2019-05-09 17:02:08.676  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.676  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : onNext(The message 2)
...
2019-05-09 17:02:08.710  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : onNext(The message 100)
2019-05-09 17:02:08.710  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.710  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.710  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.711  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : onComplete()
2019-05-09 17:02:08.711  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.711  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.860  INFO 3462 --- [ctor-http-nio-6] client.response.Flux.MonoFlatMapMany.2   : onNext(CSubeSX3yIVP2CD6FRlojg==)
2019-05-09 17:02:08.862  INFO 3462 --- [ctor-http-nio-6] client.response.Flux.MonoFlatMapMany.2   : onComplete()
^C2019-05-09 17:02:47.393  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : cancel()

每一段文本都被识别为流量流的一个元素,并被单独请求。

但是在服务器日志中:

2019-05-09 17:02:08.811  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
2019-05-09 17:02:08.813  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : onSubscribe(FluxMap.MapSubscriber)
2019-05-09 17:02:08.814  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : request(1)
2019-05-09 17:02:08.814  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.838  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : onNext(The message 1The message 2The message 3The message 4The message 5The message 6The message 7The message 8The message 9The message 10The message 11The message 12The message 13The message 14The message 15The message 16The message 17The message 18The message 19The message 20The message 21The message 22The message 23The message 24The message 25The message 26The message 27The message 28The message 29The message 30The message 31The message 32The message 33The message 34The message 35The message 36The message 37The message 38The message 39The message 40The message 41The message 42The message 43The message 44The message 45The message 46The message 47The message 48The message 49The message 50The message 51The message 52The message 53The message 54The message 55The message 56The message 57The message 58The message 59The message 60The message 61The message 62The message 63The message 64The message 65The message 66The message 67The message 68The message 69The message 70The message 71The message 72The message 73The message 74The message 75The message 76The message 77The message 78The message 79The message 80The message 81The message 82The message 83The message 84The message 85The message 86The message 87The message 88The message 89The message 90The message 91The message 92The message 93The message 94The message 95The message 96The message 97The message 98The message 99The message 100)
2019-05-09 17:02:08.840  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : onNext(CSubeSX3yIVP2CD6FRlojg==)
2019-05-09 17:02:08.852  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : request(32)
2019-05-09 17:02:08.852  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : request(32)
2019-05-09 17:02:08.852  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : onComplete()
2019-05-09 17:02:08.852  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : onComplete()
2019-05-09 17:02:47.394  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : cancel()
2019-05-09 17:02:47.394  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : cancel()

我看到所有文本片段同时到达服务器,因此被作为流量流中的一个大元素处理(也可以验证只计算了一个 MD5 哈希值,而不是 100 个)。

我期望的是服务器也从客户端接收文本片段作为通量流中的元素,否则对于服务器来说它不是真正的 react ,而只是一个正常的阻塞请求。

任何人都可以帮助我了解如何使用 Webflux 发出真正的通量响应请求吗?谢谢!

更新

我使用类似的命令行向服务器发出 REST 请求,可以看到服务器接收到的文本片段(“消息 x”)作为通量流。所以我猜服务器没问题,现在问题可能出在客户端:如何使用 WebClient 发出真正的 flux REST 请求?

最佳答案

如果想实现流媒体效果,可以:

关于java - 如何提出真正的通量请求?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56109240/

相关文章:

java - Spring boot 不执行 schema.sql 脚本

java - 如何在 Spring WebFlux 的一个 Mono 中基于另一个请求执行连续的 Web 请求?

java - Java 中的 SSL 超重

java - 如果有更重要的消息进来,则添加到消息队列

java - 让斯坦福 NLP 识别具有多个单词的命名实体

java - 测试@NotNull 时集成测试失败

Java 反射 getDeclaredMethod(...) 返回 Null

java - 查询 DSL + Spring Data JPA 中的 Group by 抛出 NoSuchElementException

java - WebTestClient 返回线程不支持的 IllegalStateException : block()/blockFirst()/blockLast() are blocking,

java - 如何正确读取 Flux<DataBuffer> 并将其转换为单个 inputStream