scala - 通过连接池发出 http 请求时 Akka Flow 挂起

标签 scala akka akka-stream akka-http

我正在使用 Akka 2.4.4 并尝试从 Apache HttpAsyncClient 迁移(未成功)。

下面是我在项目中使用的代码的简化版本。

问题是,如果我向流发送超过 1-3 个请求,它就会挂起。到目前为止,经过6个小时的调试,我什至找不到问题所在。我在 Decider 中没有看到异常、错误日志、事件。没什么:)

我尝试将 connection-timeout 设置减少为 1 秒,认为它可能正在等待服务器的响应,但没有帮助。

我做错了什么?

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Referer
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Supervision.Decider
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import com.typesafe.config.ConfigFactory

import scala.collection.immutable.{Seq => imSeq}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.Try

object Main {

  implicit val system = ActorSystem("root")
  implicit val executor = system.dispatcher
  val config = ConfigFactory.load()

  private val baseDomain = "www.google.com"
  private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config))

  private val decider: Decider = {
    case ex =>
      ex.printStackTrace()
      Supervision.Stop
  }

  private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] =

    Source.fromIterator(() => items.toIterator)
      .via(poolClientFlow)
      .log("Logger")(log = myAdapter)
      .recoverWith {
        case ex =>
          println(ex)
          null
      }
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      .runWith(Sink.seq)
      .map { v =>
        println(s"Got ${v.length} responses in Flow")
        v.asInstanceOf[Seq[(Try[HttpResponse], T)]]
      }

  def main(args: Array[String]) {

    val headers = imSeq(Referer("https://www.google.com/"))
    val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID"
    val requests = List.fill(10)(reqPair)
    val qwe = sendMultipleRequests(requests).map { case responses =>
      println(s"Got ${responses.length} responses")

      system.terminate()
    }

    Await.ready(system.whenTerminated, Duration.Inf)
  }
}

还有proxy support怎么了? ?似乎也不适合我。

最佳答案

您需要完全使用响应正文,以便连接可用于后续请求。如果您根本不关心响应实体,那么您可以将其排出到 Sink.ignore,如下所示:

resp.entity.dataBytes.runWith(Sink.ignore)

根据默认配置,使用主机连接池时,最大连接数设置为 4。每个池都有自己的队列,请求将在其中等待,直到其中一个打开的连接可用。如果该队列超过 32(默认配置,可以更改,必须是 2 的幂),那么您将开始看到失败。在您的例子中,您只执行 10 个请求,因此您没有达到该限制。但是,通过不消耗响应实体,您不会释放连接,其他所有内容都会在后面排队,等待连接释放。

关于scala - 通过连接池发出 http 请求时 Akka Flow 挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37061585/

相关文章:

scala - 使用 Scala 2.10 更新到 Eclipse Juno 后,多维数组声明失败

scala - 玩!异步操作 akka AskTimeoutException

java - ListBuffer[BaseType] 中的协方差/子类型

scala - 在 Akka 中测试 Actor

scala - 在 Akka Streams 中模拟源

scala - Spark 错误 :- "value foreach is not a member of Object"

java - 处理 Future onSuccess 作为 Akka Actor 的回应

java - Akka Http解析实体Java

scala - 使用 Akka/Akka Streams/Akka HTTP 时出现 Akka 版本冲突

scala - 如何使用 Akka-HTTP 客户端 websocket 发送消息