spring - 使用 Spring Webflux 恢复文件下载以及 Spring 中的静态文件服务

标签 spring reactive-programming spring-webflux project-reactor spring-webclient

我相信整个互联网对此没有答案,因为它可能非常复杂,但我会继续询问。

基本上,我想在多个 Spring 应用程序之间进行交叉通信。他们每个人都以静态方式提供资源,here is the link for that topic 。此服务由其他应用程序实例利用,这些应用程序实例可以根据请求下载这些文件(目前我正在通过 HTTP 传输文件)。感谢 Downlolad and save file from ClientRequest using ExchangeFunction in Project Reactor 我能够下载文件所以问题。

现在我想提升我的代码,以便在出现连接问题或应用程序在给定超时时间内暂时不可用的情况下,我能够恢复文件的下载。我配置了 WebClient 超时,如 this article 所示。 .

现在,我认为这样的代码实际上可以让我处理暂时不可用的服务:

final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(targetPath, StandardOpenOption.WRITE);

Flux<DataBuffer> fileData = Mono.just(filePath)
    .map(file -> targetPath.toFile().exists() ? targetPath.toFile().length() : 0)
    .map(bytes -> webClient
            .get()
            .uri(uri)
            .accept(MediaType.APPLICATION_OCTET_STREAM)
            .header("Range", String.format("bytes=%d-", bytes))
            .retrieve()
            .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomException("4xx error")))
            .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomException("5xx error")))
            .bodyToFlux(DataBuffer.class)
    )
    .flatMapMany(Function.identity());

DataBufferUtils
        .write(fileData , fileChannel)
        .map(DataBufferUtils::release)
        .doOnError(throwable -> {
            try {
                fileChannel.force(true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
        .retry(3)
        .doOnComplete(() -> {
            try {
                fileChannel.force(true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
        .doOnError(e -> !(e instanceof ChannelException), e -> {
            try {
                Files.deleteIfExists(targetPath);
            } catch (IOException exc) {
                exc.printStackTrace();
            }
        })
        .doOnError(ChannelException.class, e -> {
            try {
                Files.deleteIfExists(targetPath);
            } catch (IOException exc) {
                exc.printStackTrace();
            }
        })
        .doOnTerminate(() -> {
            try {
                fileChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
    .blockLast();

但显然,每当我终止应用程序的第二个实例时,我都会收到一堆错误:

reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
2019-10-25T15:41:53.602+0200 [ERROR] [xxx] [N/A:N/A] [r.core.publisher.Operators] { thread=reactor-http-nio-4  } Operator called default onErrorDropped
reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
    at reactor.core.Exceptions.bubble(Exceptions.java:154)
    at reactor.core.publisher.Operators.onErrorDropped(Operators.java:512)
    at reactor.netty.channel.FluxReceive.onInboundError(FluxReceive.java:343)
    at reactor.netty.channel.ChannelOperations.onInboundError(ChannelOperations.java:399)
    at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:258)
    at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:121)

稍后在同一堆栈跟踪中:

2019-10-25T15:41:53.602+0200 [WARN] [xxx] [N/A:N/A] [i.n.c.AbstractChannelHandlerContext] { thread=reactor-http-nio-4  } An exception 'reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
    at reactor.core.Exceptions.bubble(Exceptions.java:154)
    at reactor.core.publisher.Operators.onErrorDropped(Operators.java:512)
    at reactor.netty.channel.FluxReceive.onInboundError(FluxReceive.java:343)
    at reactor.netty.channel.ChannelOperations.onInboundError(ChannelOperations.java:399)
    at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:258)
    at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:121)

异常本身并不是什么大问题,但问题是,在我启动应用程序后,我的下载不会继续。

所以,是的,我的问题是,我如何才能恢复下载以及我应该/如何处理这里的异常?

最佳答案

该代码有 2 个问题。首先是在开始时确定文件有多大的方法。使用 AtomicLong 的正确方法就像在 DataBufferUtils.write() 中一样。第二个问题是远程调用之间的retry()。在处理具有固定退避的远程调用时,使用 retryWhen() 似乎是一种不错的方式。总而言之,下面的代码使我能够在出现连接/应用程序问题时从正确的字节恢复文件下载

final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(targetPath, StandardOpenOption.WRITE);

AtomicLong fileSize = new AtomicLong(targetPath.toFile().length());

Flux<DataBuffer> fileDataStream = webClient
                .get()
                .uri(remoteXFerServiceTargetHost)
                .accept(MediaType.APPLICATION_OCTET_STREAM)
                .header("Range", String.format("bytes=%d-", fileSize.get()))
                .retrieve()
                .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomException("4xx error")))
                .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomException("5xx error")))
                .bodyToFlux(DataBuffer.class);

DataBufferUtils
        .write(fileData , fileChannel)
        .map(DataBufferUtils::release)
        .doOnError(throwable -> {
            try {
                fileChannel.force(true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
        .retryWhen(Retry.any().fixedBackoff(Duration.ofSeconds(5)).retryMax(5))
        .doOnComplete(() -> {
            try {
                fileChannel.force(true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
        .doOnError(e -> !(e instanceof ChannelException), e -> {
            try {
                Files.deleteIfExists(targetPath);
            } catch (IOException exc) {
                exc.printStackTrace();
            }
        })
        .doOnError(ChannelException.class, e -> {
            try {
                Files.deleteIfExists(targetPath);
            } catch (IOException exc) {
                exc.printStackTrace();
            }
        })
        .doOnTerminate(() -> {
            try {
                fileChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
    .blockLast();

关于spring - 使用 Spring Webflux 恢复文件下载以及 Spring 中的静态文件服务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58560204/

相关文章:

java - 如何在java中将JSON反序列化为类Object

java - spring-boot-devtools 重载多模块maven项目变更

ios - ReactiveCocoa - 停止命令

netty - 为 Netty 服务器上的 Spring Reactive Web 应用程序定义最大并发用户数

java - 带有文本/html 响应的响应式(Reactive) WebClient GET 请求

ssl - 无法使用 Spring Data Reactive 和 Spring Boot 2.0 连接到 mongoDB

java - Spring Boot邮件忽略application.properties

java - $Proxy309 在 eclipse 调试器中是什么意思

java - Mono.error 内的日志输出两次

java - 即使值存在,也无法单独从 Redis 加载值