java - block()/blockFirst()/blockLast() 在调用 bodyToMono AFTER exchange() 时出现阻塞错误

标签 java reactive-programming spring-webflux project-reactor

我正在尝试使用 Webflux 将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到错误,api 会返回成功,但在生成文件时会使用 DTO 详细说明错误,而不是文件本身。这是使用非常陈旧且设计不佳的 api,因此请原谅使用 post 和 api 设计。

api 调用 (exchange()) 的响应是一个 ClientResponse。从这里我可以使用 bodyToMono 转换为 ByteArrayResource,它可以流式传输到文件,或者,如果在创建文件时出错,那么我也可以使用 bodyToMono 转换为 DTO。但是,我似乎无法根据 ClientResponse header 的内容来做任何事情。

在运行时,我得到一个由

引起的 IllegalStateException

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-client-epoll-12

我认为我的问题是我不能在同一个函数链中调用 block() 两次。

我的代码片段是这样的:

webClient.post()
        .uri(uriBuilder -> uriBuilder.path("/file/")
                                      .queryParams(params).build())
        .exchange()
        .doOnSuccess(cr -> {
                if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {
                    NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
                    createErrorFile(dto);
                }
                else {
                    ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
                    createSpreadsheet(bAr);
                }
            }
        )
        .block();

基本上我想根据 header 中定义的 MediaType 以不同方式处理 ClientResponse。

这可能吗?

最佳答案

首先,有几件事可以帮助您理解解决此用例的代码片段。

  1. 永远不要在返回 react 类型的方法中调用阻塞方法;您将阻塞应用程序的几个线程之一,这对应用程序非常不利
  2. 无论如何从 Reactor 3.2 开始,blocking within a reactive pipeline throws an error
  3. 调用 subscribe ,正如评论中所建议的那样,也不是一个好主意。它或多或少类似于在单独的线程中将作业作为任务启动。当它完成时你会得到一个回调(subscribe 方法可以被赋予 lambda),但实际上你正在将你当前的管道与该任务解耦。在这种情况下,在您有机会读取完整的响应正文并将其写入文件之前,可以关闭客户端 HTTP 响应并清理资源
  4. 如果您不想在内存中缓冲整个响应,Spring 提供了 DataBuffer (考虑可以合并的 ByteBuffer 实例)。
  5. 如果您正在实现的方法本身是阻塞的(例如返回 void),您可以调用 block,例如在测试用例中。

这是您可以用来执行此操作的代码片段:

Mono<Void> fileWritten = WebClient.create().post()
        .uri(uriBuilder -> uriBuilder.path("/file/").build())
        .exchange()
        .flatMap(response -> {
            if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) {
                Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);
                return createErrorFile(dto);
            }
            else {
                Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);
                return createSpreadsheet(body);
            }
        });
// Once you get that Mono, you should give plug it into an existing
// reactive pipeline, or call block on it, depending on the situation

如您所见,我们没有在任何地方阻塞,处理 I/O 的方法正在返回 Mono<Void> ,这是 react 等效的 done(error)当事情完成以及是否发生错误时发出信号的回调。

因为我不确定 createErrorFile 是什么方法应该可以,我已经为 createSpreadsheet 提供了一个示例只是将正文字节写入文件。请注意,由于数据缓冲区可能会被回收/合并,因此我们需要在完成后释放它们。

private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) {
    try {
        Path file = //...
        WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
        return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();
    } catch (IOException exc) {
        return Mono.error(exc);
    }
}

通过此实现,您的应用程序将包含几个 DataBuffer在给定时间内存中的实例( react 性运算符出于性能原因预取值)并且将以 react 性方式写入字节。

关于java - block()/blockFirst()/blockLast() 在调用 bodyToMono AFTER exchange() 时出现阻塞错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51449889/

相关文章:

java - vert.x:你如何正确发送一个post请求?

reactive-programming - 如何合并两个流(不包含null)并在对上应用条件?

javascript - 如何避免 Rx 中的故障

java - 如何在 spring-mvc 中向 webflux 端点添加日志记录?

Java - 用具体类型覆盖对象类型参数

java - 无法将当前目录添加到类路径

Java-如何从同一个包中的类获取私有(private)数据

javamail 无法读取多部分/混合邮件

spring-boot - 如何在 Spring Boot 2 + webflux 中正确生成 URI 的指标

spring - 如何在 Mono webflux 中返回 HttpStatus 204