我在 Java 网络应用程序中有一个分层架构。 UI 层只是 Java,服务是类型化的 Akka actor,外部服务调用(WS、DB 等)包装在 Hystrix 命令中。
UI 调用服务,服务返回 Akka future 。这是一个 Akka future ,因为我想使用 Akka future 提供的 onComplete 和 onFailure 回调来简化 UI 编码。该服务然后创建执行某些映射等操作的 future ,并包装对返回 Java future 的 HystrixCommand 的调用。
所以在伪代码中:
界面
AkkaFuture future = service.getSomeData();
服务
public AkkaFuture getSomeData() {
return future {
JavaFuture future = new HystrixCommand(mapSomeData()).queue()
//what to do here, currently just return future.get()
}
}
问题是,我想释放服务参与者正在使用的线程,只占用 Hystrix 使用的线程。但是 java future 阻止了这种情况,因为我必须阻止它的完成。我能想到的唯一选择(我不确定自己是否喜欢)是不断轮询 Java future ,并在 Java future 完成时完成 Akka future 。
注意:这个问题与 Hystrix 本身并没有真正的关系,但我决定在有人提出与 Hystrix 特别相关的解决方案时提及它。
最佳答案
我将@Hbf 的答案标记为解决方案,因为我最终按照 How do I wrap a java.util.concurrent.Future in an Akka Future? 中的解释做了一个 Akka 轮询器。 .作为引用,我也尝试过:
- 创建 HystrixCommandExcutionHook 并扩展 HystrixCommand 以允许回调。这没有用,因为没有在正确的时间调用 Hook 。
- 使用 Guavas listenable future,方法是让经过修饰的执行程序在 Hystrix 中创建 future,然后从命令中转换 future。不起作用,因为 Hystrix 使用无法装饰的 ThreadPoolExecutor。
编辑:我在下面添加了 Akka 轮询器代码,因为最初的答案是在 Scala 中,如果 Java future 不能很好地取消,它就会挂起。下面的解决方案总是在超时后离开线程。
protected Future wrapJavaFutureInAkkaFuture(final java.util.concurrent.Future javaFuture, final Option maybeTimeout, final ActorSystem actorSystem) {
final Promise promise = Futures.promise();
if (maybeTimeout.isDefined()) {
pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, Option.option(maybeTimeout.get().fromNow()), actorSystem);
} else {
pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, Option. none(), actorSystem);
}
return promise.future();
}
protected void pollJavaFutureUntilDoneOrCancelled(final java.util.concurrent.Future javaFuture, final Promise promise, final Option maybeTimeout, final ActorSystem actorSystem) {
if (maybeTimeout.isDefined() && maybeTimeout.get().isOverdue()) {
// on timeouts, try to cancel the Java future and simply walk away
javaFuture.cancel(true);
promise.failure(new ExecutionException(new TimeoutException("Future timed out after " + maybeTimeout.get())));
} else if (javaFuture.isDone()) {
try {
promise.success(javaFuture.get());
} catch (final Exception e) {
promise.failure(e);
}
} else {
actorSystem.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), new Runnable() {
@Override
public void run() {
pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, maybeTimeout, actorSystem);
}
}, actorSystem.dispatcher());
}
}
关于java - 如何使用 Akka actors 处理 Java futures,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14890724/