java - Vertx Future 不会等待

标签 java reactive-programming rx-java vert.x vertx3

由于我在堆栈中使用 Vertx 3.1,所以我想使用该工具带来的 Future 功能,但在阅读完 API 后,它似乎对我来说非常有限。我什至找不到让 future 等待 Observable 的方法。 这是我的代码

          public Observable<CommitToOrderCommand> validateProductRestrictions(CommitToOrderCommand cmd) {
    Future<Observable<CommitToOrderCommand>> future = Future.future();
    orderRepository.getOrder(cmd, cmd.orderId)
                   .flatMap(order -> validateOrderProducts(cmd, order))
                   .subscribe(map -> checkMapValues(map, future, cmd));
     Observable<CommitToOrderCommand> result = future.result();
    if(errorFound){
        throw MAX_QUANTITY_PRODUCT_EXCEED.create("Fail"/*restrictions.getBulkBuyLimit().getDescription())*/);
    }
    return result;
}

private void checkMapValues(Multimap<String, BigDecimal> totalUnitByRestrictions, Future<Observable<CommitToOrderCommand>> future,
                            CommitToOrderCommand cmd) {
    for (String restrictionName : totalUnitByRestrictions.keySet()) {
        Restrictions restrictions = Restrictions.valueOf(restrictionName);
        if (totalUnitByRestrictions.get(restrictionName)
                                   .stream()
                                   .reduce(BigDecimal.ZERO, BigDecimal::add)
                                   .compareTo(restrictions.getBulkBuyLimit()
                                                          .getMaxQuantity()) == 1) {
            errorFound = true;
        }
    }
    future.complete(Observable.just(cmd));
}

在我的第一个 Observable 的 onComplete 中,我正在检查结果,完成后是我完成 future 以解锁操作的时间。 但我发现 future.result 不会被阻止,直到 future.complete 被调用,正如我所期望的那样。相反,只是返回 null。

知道这里出了什么问题吗?

问候。

最佳答案

vertx future 不会阻塞,而是与注入(inject)结果时调用的处理程序一起工作(请参阅 setHandlerisComplete )。

如果外层代码需要Observable,则不需要将其包装在Future中,只需返回 Observable<T>Future<Observable<T>>没有多大意义,您混合了两种执行异步结果的方法。

请注意,有多种方法可以将 Observable 折叠为 Future,但困难在于 Observable 可能会发出多个项目,而 Future 只能保存单个项目。您已经通过将结果收集到一次 map 发射中来解决这个问题。

从此Observable如果您想要 Future ,则只会发出一项你应该subscribe并调用 future.complete(yourMap)onNext方法。还定义 onError将调用 future.fail 的处理程序.

关于java - Vertx Future 不会等待,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34103604/

相关文章:

Java 反射调用带有通用参数的方法

java - 获取与预期不同的 http 状态代码

f# - 基于下游条件的 RX 过滤器

multithreading - 如何在不使用线程池执行器的情况下在单独的线程上异步执行可调用任务?

java - RxThreadFactory没有实现并发接口(interface)

rx-java - Java Spring WebFlux 与 RxJava

java - 空对象引用上的 android.content.Context.getPackageName()

java - 如何使用 DOM 级别 3 序列化 API 生成 DOCTYPE 声明?

android - 订阅 Android UI 线程

java - 如何模拟返回 `Mono<Void>` 的方法