java - 如何使用 Akka actors 处理 Java futures

标签 java asynchronous akka future

我在 Java 网络应用程序中有一个分层架构。 UI 层只是 Java,服务是类型化的 Akka actor,外部服务调用(WS、DB 等)包装在 Hystrix 命令中。

UI 调用服务,服务返回 Akka future 。这是一个 Akka future ,因为我想使用 Akka future 提供的 onComplete 和 onFailure 回调来简化 UI 编码。该服务然后创建执行某些映射等操作的 future ,并包装对返回 Java future 的 HystrixCommand 的调用。

所以在伪代码中:

界面

AkkaFuture future = service.getSomeData();

服务

public AkkaFuture getSomeData() {
    return future {
        JavaFuture future = new HystrixCommand(mapSomeData()).queue()
        //what to do here, currently just return future.get()
    }
}

问题是,我想释放服务参与者正在使用的线程,只占用 Hystrix 使用的线程。但是 java future 阻止了这种情况,因为我必须阻止它的完成。我能想到的唯一选择(我不确定自己是否喜欢)是不断轮询 Java future ,并在 Java future 完成时完成 Akka future 。

注意:这个问题与 Hystrix 本身并没有真正的关系,但我决定在有人提出与 Hystrix 特别相关的解决方案时提及它。

最佳答案

我将@Hbf 的答案标记为解决方案,因为我最终按照 How do I wrap a java.util.concurrent.Future in an Akka Future? 中的解释做了一个 Akka 轮询器。 .作为引用,我也尝试过:

  • 创建 HystrixCommandExcutionHook 并扩展 HystrixCommand 以允许回调。这没有用,因为没有在正确的时间调用 Hook 。
  • 使用 Guavas listenable future,方法是让经过修饰的执行程序在 Hystrix 中创建 future,然后从命令中转换 future。不起作用,因为 Hystrix 使用无法装饰的 ThreadPoolExecutor。

编辑:我在下面添加了 Akka 轮询器代码,因为最初的答案是在 Scala 中,如果 Java future 不能很好地取消,它就会挂起。下面的解决方案总是在超时后离开线程。


    protected  Future wrapJavaFutureInAkkaFuture(final java.util.concurrent.Future javaFuture, final Option maybeTimeout, final ActorSystem actorSystem) {
      final Promise promise = Futures.promise();
        if (maybeTimeout.isDefined()) {
          pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, Option.option(maybeTimeout.get().fromNow()), actorSystem);
        } else {
          pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, Option. none(), actorSystem);
        }

        return promise.future();
    }

    protected  void pollJavaFutureUntilDoneOrCancelled(final java.util.concurrent.Future javaFuture, final Promise promise, final Option maybeTimeout, final ActorSystem actorSystem) {
      if (maybeTimeout.isDefined() && maybeTimeout.get().isOverdue()) {
        // on timeouts, try to cancel the Java future and simply walk away
        javaFuture.cancel(true);
        promise.failure(new ExecutionException(new TimeoutException("Future timed out after " + maybeTimeout.get())));

      } else if (javaFuture.isDone()) {
        try {
          promise.success(javaFuture.get());
        } catch (final Exception e) {
          promise.failure(e);
        }
      } else {
            actorSystem.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), new Runnable() {
          @Override
          public void run() {
            pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, maybeTimeout, actorSystem);
          }
        }, actorSystem.dispatcher());
      }
    }

关于java - 如何使用 Akka actors 处理 Java futures,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14890724/

相关文章:

Java EntityManager 合并和 @PrePersist

具有离散值的 Java for 循环

java - 在 Java 中使用正则表达式解析时间

javascript - 将多个请求的结果保存到数组中

asp.net - 从异步调用加载 ASP.NET GridView

python - 没有 celery 的Django后台处理

java - C++ 调用 Java 的 JNI localrefs 的生命周期是多少?

java - 在 Akka Scheduler 中的特定日期安排作业

java - 类 akka.actor.TypedActor$MethodCall 无法使用修饰符 "public abstract"访问类 JobManager 的成员

scala - 如何检测 Akka Actor 何时完成?