我需要使用 Akka 的 HTTP 客户端 (v2.0.2) 来使用 REST 服务。合乎逻辑的方法是通过主机连接池来做到这一点,因为我们期望有大量的同时连接。 Flow
为此消耗了 (HttpRequest, T)
并返回 (Try[HttpResponse, T)
. documentation表示某些任意类型 T
需要管理对请求的潜在无序响应,但没有指出调用者应该如何处理返回的 T
.
我的第一次尝试是使用以下函数 Int
如 T
.从许多地方调用它以确保连接使用单个池。
val pool = Http().cachedHostConnectionPool[Int]("127.0.0.1", 8888, ConnectionPoolSettings(system))
def pooledRequest(req: HttpRequest): Future[HttpResponse] = {
val unique = Random.nextInt
Source.single(req → unique).via(pool).runWith(Sink.head).flatMap {
case (Success(r: HttpResponse), `unique`) ⇒ Future.successful(r)
case (Failure(f), `unique`) ⇒ Future.failed(f)
case (_, i) ⇒ Future.failed(new Exception("Return does not match the request"))
}
}
问题是客户端应该如何使用这个
T
?有更清洁更有效的解决方案吗?最后,我对某些东西可能会出现故障的偏执狂实际上不是偏执狂吗?
最佳答案
最初我自己对此有点困惑,直到我通读了几次文档。如果您打算在池中使用单个请求,无论有多少不同的地方共享同一个池,T
您提供的(在您的情况下为 Int
)并不重要。因此,如果您使用的是 Source.single
一直以来,该 key 始终可以是 1
如果你真的想要。
但是,它确实发挥作用的地方是,如果一段代码要使用池并将多个请求一次提交到池中,并希望收到所有这些请求的响应。原因是响应按照从被调用的服务接收到的顺序返回,而不是按照它们提供给池的顺序返回。每个请求可能需要不同的时间,因此它们会向下游流向 Sink
按照他们从游泳池收到的顺序。
假设我们有一项接受 GET
的服务具有以下形式的网址的请求:
/product/123
凡
123
part 是您要查找的产品的 ID。如果我想查找产品 1-10
一次性完成,每个都有单独的请求,这是标识符变得重要的地方,以便我可以关联每个 HttpResponse
带有它所适用的产品 ID。此场景的简化代码示例如下:val requests = for(id <- 1 until 10) yield (HttpRequest(HttpMethods.GET, s"/product/$id"), id)
val responsesMapFut:Future[Map[Int,HttpResponse]] =
Source(requests).
via(pool).
runFold(Map.empty[Int,HttpResponse]){
case (m, (util.Success(resp), id)) =>
m ++ Map(id -> resp)
case (m, (util.Failure(ex), i)) =>
//Log a failure here probably
m
}
当我在
fold
中收到回复时,我也很方便地拥有每个关联的 ID,因此我可以将它们添加到我的 Map
中。这是由 id 键控的。如果没有这个功能,我可能不得不做一些事情,比如解析主体(如果它是 json)来尝试找出哪个响应是哪个,哪个不理想,并且不包括失败的情况。在这个解决方案中,我知道哪些请求失败了,因为我仍然得到了标识符。我希望这可以为您澄清一些事情。
关于scala - Akka http客户端连接池的正确使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34872793/