scala - Akka http客户端连接池的正确使用

标签 scala akka-http

我需要使用 Akka 的 HTTP 客户端 (v2.0.2) 来使用 REST 服务。合乎逻辑的方法是通过主机连接池来做到这一点,因为我们期望有大量的同时连接。 Flow为此消耗了 (HttpRequest, T)并返回 (Try[HttpResponse, T) . documentation表示某些任意类型 T需要管理对请求的潜在无序响应,但没有指出调用者应该如何处理返回的 T .

我的第一次尝试是使用以下函数 IntT .从许多地方调用它以确保连接使用单个池。

val pool = Http().cachedHostConnectionPool[Int]("127.0.0.1", 8888, ConnectionPoolSettings(system))

def pooledRequest(req: HttpRequest): Future[HttpResponse] = {
  val unique = Random.nextInt
  Source.single(req → unique).via(pool).runWith(Sink.head).flatMap {
    case (Success(r: HttpResponse), `unique`) ⇒ Future.successful(r)
    case (Failure(f), `unique`) ⇒ Future.failed(f)
    case (_, i) ⇒ Future.failed(new Exception("Return does not match the request"))
  }
}

问题是客户端应该如何使用这个T ?有更清洁更有效的解决方案吗?最后,我对某些东西可能会出现故障的偏执狂实际上不是偏执狂吗?

最佳答案

最初我自己对此有点困惑,直到我通读了几次文档。如果您打算在池中使用单个请求,无论有多少不同的地方共享同一个池,T您提供的(在您的情况下为 Int)并不重要。因此,如果您使用的是 Source.single一直以来,该 key 始终可以是 1如果你真的想要。

但是,它确实发挥作用的地方是,如果一段代码要使用池并将多个请求一次提交到池中,并希望收到所有这些请求的响应。原因是响应按照从被调用的服务接收到的顺序返回,而不是按照它们提供给池的顺序返回。每个请求可能需要不同的时间,因此它们会向下游流向 Sink按照他们从游泳池收到的顺序。

假设我们有一项接受 GET 的服务具有以下形式的网址的请求:

/product/123

123 part 是您要查找的产品的 ID。如果我想查找产品 1-10一次性完成,每个都有单独的请求,这是标识符变得重要的地方,以便我可以关联每个 HttpResponse带有它所适用的产品 ID。此场景的简化代码示例如下:
val requests = for(id <- 1 until 10) yield (HttpRequest(HttpMethods.GET, s"/product/$id"), id)
val responsesMapFut:Future[Map[Int,HttpResponse]] = 
  Source(requests).
    via(pool).
    runFold(Map.empty[Int,HttpResponse]){
      case (m, (util.Success(resp), id)) => 
        m ++ Map(id -> resp)

      case (m, (util.Failure(ex), i)) =>
        //Log a failure here probably
          m
    }

当我在 fold 中收到回复时,我也很方便地拥有每个关联的 ID,因此我可以将它们添加到我的 Map 中。这是由 id 键控的。如果没有这个功能,我可能不得不做一些事情,比如解析主体(如果它是 json)来尝试找出哪个响应是哪个,哪个不理想,并且不包括失败的情况。在这个解决方案中,我知道哪些请求失败了,因为我仍然得到了标识符。

我希望这可以为您澄清一些事情。

关于scala - Akka http客户端连接池的正确使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34872793/

相关文章:

scala - 如何获取 HDFS 上存在的文件的创建日期?

scala - Spark程序性能——GC&任务反序列化&并发执行

scala - Play Scala Akka WebSockets 更改 Actor 路径

scala - Scala 中的 OSGi 注释(Activate、Reference、Component)

scala - 在流中链接 Akka-http-client 请求

java - 如何将数据从重复列表数据转换为组数据列表

scala - 从 Akka HttpResponse 检索 Cookie

scala - 使用 Akka 将文件从服务器流式传输到客户端

scala - 为什么 Source.tick 在 100 个 Http 请求后停止?

scala - AKKA HTTP 源流与 Futures