我如何正确处理从 futures 构建的 Monos?
我正在努力了解 Spring Reactive(和 Spring 5),观看所有视频并阅读我能找到的所有博客,但他们似乎都没有做一些比查询数据库稍微多一点的事情,或其他一些微不足道的事情。
我正在使用新的 AWS 2.0 SDK,它使用 CompletableFuture
来处理大多数事情。使用服务创建新实例,我的方法如下所示
public Mono<RunInstancesResponse> create(Instance instance) {
RunInstancesRequest runInstancesRequest = RunInstancesRequest.builder()
.instanceType(instance.getInstanceType())
.imageId(instance.getImageId())
.securityGroupIds(instance.getSecurityGroupIds())
.keyName(instance.getKeyName())
.minCount(1)
.maxCount(1)
.tagSpecifications(createTags(instance))
.build();
CompletableFuture<RunInstancesResponse> future = client.runInstances(runInstancesRequest);
future.whenComplete((response, error) -> {
response.reservation().instances().stream().map(aws -> Instance.builder()
.imageId(aws.imageId())
.build()
).forEach(instanceRepository::save);
});
return Mono.fromFuture(future);
}
我的理解是我几乎立即返回类型为 RunInstancesResponse
的 Mono
,而 future.whenComplete
会做这件事每当。
我从我的路由处理程序中调用它,看起来像
public Mono<ServerResponse> create(ServerRequest request) {
return request.bodyToMono(Instance.class)
.flatMap(createService::create)
.flatMap(i -> ServerResponse.accepted().build());
}
现在这几乎可以按我的预期工作,但是有几个关键问题我不知道如何解决。
1.) whenComplete
从未被调用,我相信这是因为我没有订阅它。
2.) 在 whenComplete
完成之前(大约 2.5 秒)服务器不会响应客户端,这并不理想,因为我希望它立即响应然后更新客户端当调用 whenComplete
时。
我感觉我的整个服务和处理程序的做事方式完全错误。
我喜欢一些示例,说明我应该如何处理服务中的 future,其中它是从具有 Mono
或 Flux
类型的路由处理程序调用的。
最佳答案
我写了一个 open source library它包装了 SQS、SNS 和 DynamoDb,使这更容易一些。为避免仅链接答案,您可以对其应用以下内容:
public Mono<ChangeMessageVisibilityResult> changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout) {
return Mono.create(subscriber -> amazonClient.changeMessageVisibilityAsync(queueUrl, receiptHandle, visibilityTimeout, AmazonWebServiceRequestAsyncHandler.valueEmittingHandlerFor(subscriber)));
}
传递的处理程序在两个世界之间转换:
public class AmazonWebServiceRequestAsyncHandler<RQ extends AmazonWebServiceRequest, RS> implements AsyncHandler<RQ, RS> {
private final MonoSink<? super RS> subscriber;
private boolean shouldEmitValue;
private AmazonWebServiceRequestAsyncHandler(MonoSink<? super RS> subscriber, boolean shouldEmitValue) {
this.subscriber = subscriber;
this.shouldEmitValue = shouldEmitValue;
}
@Override
public void onError(Exception exception) {
subscriber.error(exception);
}
@Override
public void onSuccess(RQ request, RS response) {
if (shouldEmitValue) {
subscriber.success(response);
} else {
subscriber.success();
}
}
public static <RQ extends AmazonWebServiceRequest, RS> AsyncHandler<RQ, RS> valueEmittingHandlerFor(final MonoSink<? super RS> subscriber) {
return new AmazonWebServiceRequestAsyncHandler<>(subscriber, true);
}
public static <RQ extends AmazonWebServiceRequest> AsyncHandler<RQ, Void> voidHandlerFor(MonoSink<? super Void> subscriber) {
return new AmazonWebServiceRequestAsyncHandler<>(subscriber, false);
}
}
关于spring - 从返回 future 的服务创建 Mono/Flux 的正确方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48372032/