spring - 从返回 future 的服务创建 Mono/Flux 的正确方法

标签 spring spring-boot mono completable-future spring-webflux

我如何正确处理从 futures 构建的 Monos?

我正在努力了解 Spring Reactive(和 Spring 5),观看所有视频并阅读我能找到的所有博客,但他们似乎都没有做一些比查询数据库稍微多一点的事情,或其他一些微不足道的事情。

我正在使用新的 AWS 2.0 SDK,它使用 CompletableFuture 来处理大多数事情。使用服务创建新实例,我的方法如下所示

public Mono<RunInstancesResponse> create(Instance instance) {
    RunInstancesRequest runInstancesRequest = RunInstancesRequest.builder()
            .instanceType(instance.getInstanceType())
            .imageId(instance.getImageId())
            .securityGroupIds(instance.getSecurityGroupIds())
            .keyName(instance.getKeyName())
            .minCount(1)
            .maxCount(1)
            .tagSpecifications(createTags(instance))
            .build();

    CompletableFuture<RunInstancesResponse> future = client.runInstances(runInstancesRequest);

    future.whenComplete((response, error) -> {
        response.reservation().instances().stream().map(aws -> Instance.builder()
                .imageId(aws.imageId())
                .build()
        ).forEach(instanceRepository::save);

    });

    return Mono.fromFuture(future);
}

我的理解是我几乎立即返回类型为 RunInstancesResponseMono,而 future.whenComplete 会做这件事每当。

我从我的路由处理程序中调用它,看起来像

public Mono<ServerResponse> create(ServerRequest request) {
    return request.bodyToMono(Instance.class)
            .flatMap(createService::create)
            .flatMap(i -> ServerResponse.accepted().build());
}

现在这几乎可以按我的预期工作,但是有几个关键问题我不知道如何解决。

1.) whenComplete 从未被调用,我相信这是因为我没有订阅它。

2.) 在 whenComplete 完成之前(大约 2.5 秒)服务器不会响应客户端,这并不理想,因为我希望它立即响应然后更新客户端当调用 whenComplete 时。

我感觉我的整个服务和处理程序的做事方式完全错误。

我喜欢一些示例,说明我应该如何处理服务中的 future,其中它是从具有 MonoFlux 类型的路由处理程序调用的。

最佳答案

我写了一个 open source library它包装了 SQS、SNS 和 DynamoDb,使这更容易一些。为避免仅链接答案,您可以对其应用以下内容:

public Mono<ChangeMessageVisibilityResult> changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout) {
    return Mono.create(subscriber -> amazonClient.changeMessageVisibilityAsync(queueUrl, receiptHandle, visibilityTimeout, AmazonWebServiceRequestAsyncHandler.valueEmittingHandlerFor(subscriber)));
  }

传递的处理程序在两个世界之间转换:

public class AmazonWebServiceRequestAsyncHandler<RQ extends AmazonWebServiceRequest, RS> implements AsyncHandler<RQ, RS> {
    private final MonoSink<? super RS> subscriber;
    private boolean shouldEmitValue;

    private AmazonWebServiceRequestAsyncHandler(MonoSink<? super RS> subscriber, boolean shouldEmitValue) {
        this.subscriber = subscriber;
        this.shouldEmitValue = shouldEmitValue;
    }

    @Override
    public void onError(Exception exception) {
        subscriber.error(exception);
    }

    @Override
    public void onSuccess(RQ request, RS response) {
        if (shouldEmitValue) {
            subscriber.success(response);
        } else {
            subscriber.success();
        }
    }

    public static <RQ extends AmazonWebServiceRequest, RS> AsyncHandler<RQ, RS> valueEmittingHandlerFor(final MonoSink<? super RS> subscriber) {
        return new AmazonWebServiceRequestAsyncHandler<>(subscriber, true);
    }

    public static <RQ extends AmazonWebServiceRequest> AsyncHandler<RQ, Void> voidHandlerFor(MonoSink<? super Void> subscriber) {
        return new AmazonWebServiceRequestAsyncHandler<>(subscriber, false);
    }
}

关于spring - 从返回 future 的服务创建 Mono/Flux 的正确方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48372032/

相关文章:

java - Spring Boot 自定义 Bean 加载器

C# FTP 上传在 Linux 环境下不太有效

java - 在构造函数上注入(inject) java.util.Random

java - java中application.properties文件的问题

spring - Grails 3.0.9 : Spock integration tests' @Rollback annotation is not working

asp.net-mvc - 是否有任何相当大的 Web 应用程序(例如 100 万页浏览量/月)在单声道上运行?

c# - P/调用 C# 到 C++

java - Apache 速度 : replace newline does not work

java - Spring boot注入(inject)mongodb数据源

spring-boot - EmbeddedServletContainerCustomizer 在 Spring Boot 2 中无法配置 session 超时