由回调驱动的 Java/Scala Future

标签 java scala playframework-2.0 akka nonblocking

简短版本:

如何创建 Promise<Result>哪个在回调触发时完成?

长版:

我正在开发一个处理第三方 SOAP 服务的应用程序。来自用户的请求同时委托(delegate)给多个 SOAP 服务,汇总结果并发回给用户。

系统需要可扩展,并且应该允许多个并发用户。由于每个用户请求最终会触发大约 10 个 Web 服务调用,并且每个调用会阻塞大约 1 秒,因此系统需要设计为非阻塞 I/O。

我在这个系统的 Play Framework (Java) 中使用 Apache CXF。我已设法生成异步 WS 客户端代理并启用异步传输。我无法弄清楚的是,当我委托(delegate)给多个 Web 服务代理并且结果将作为回调获得时,如何将 Future 返回给 Play 的线程。

选项 1:使用返回 Java Future 的异步方法调用。

如本 scala.concurrent.Future wrapper for java.util.concurrent.Future 中所述线程,我们无法将 Java Future 转换为 Scala Future。从 Future 获得结果的唯一方法是执行 Future.get()阻止调用者。由于 CXF 生成的代理返回 Java Future,因此排除了此选项。

选项 2:使用 Scala Future。

由于 CXF 生成代理接口(interface),我不确定是否有任何方法可以干预并返回 Scala Future(AFAIK Akka 使用 Scala Futures)而不是 Java Future?

选项 3:使用回调方法。

由 CXF 生成的返回 Java Future 的异步方法也采用回调对象,我想它会在结果准备好时提供回调。要使用这种方法,我需要返回一个 Future,它将等待我收到回调。

我认为选项 3 是最有前途的,尽管我不知道如何返回将在收到回调时完成的 Promise。我可能有一个线程在 while(true) 中等待并在中间等待直到结果可用。同样,我不知道如何进入 wait不阻塞线程?

简而言之,我正在尝试构建一个进行大量 SOAP Web 服务调用的系统,其中每个调用都会阻塞很长时间。在大量并发 Web 服务调用的情况下,系统可能很容易耗尽线程。我正在努力寻找一种基于非阻塞 I/O 的解决方案,它可以同时允许许多正在进行的 Web 服务调用。

最佳答案

选项 3 看起来不错 :) 一些导入开始......

import scala.concurrent.{Await, Promise}
import scala.concurrent.duration.Duration

并且,为了说明这一点,这里有一个接受回调的模拟 CXF API:

def fetch(url: String, callback: String => Unit) = {
  callback(s"results for $url")
}

创建一个promise,以promise作为回调调用API:

val promise = Promise[String]
fetch("http://corp/api", result => promise.success(result))

然后你可以将 promise.future 这是 Future 的一个实例带入你的 Play 应用中。

要测试它,您可以这样做:

Await.result(promise.future, Duration.Inf)

这将阻止等待结果,此时您应该在控制台中看到“http://corp/api 的结果”。

关于由回调驱动的 Java/Scala Future,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30956704/

相关文章:

java - Java 中的引用是如何工作的?

scala - 如何以功能风格进行文件创建和操作?

mysql - 如何使用play框架在scala中的数据库中插入主键值?

xml - 树而不是字符串的解析器组合器

java - 为什么 SimpleDateFormat 解析不正确的日期?

java - 在 Action 组合中获取 url 参数

playframework-2.0 - 如何正确设置sbt依赖?

java - swing - 为什么要创建两个单独的盒子

java - 庞大的数字列表中最常重复的数字

java - 将文本文件读入 ListView(应用程序添加到 ListView 两次)?