我正在尝试使用 Akka HTTP 对我的请求进行基本身份验证。
碰巧我有一个外部资源要通过身份验证,所以我必须对这个资源进行休息调用。
这需要一些时间,在处理过程中,我的 API 的其余部分似乎被阻塞,等待这个调用。
我用一个非常简单的例子重现了这个:
// used dispatcher:
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
val routes =
(post & entity(as[String])) { e =>
complete {
Future{
Thread.sleep(5000)
e
}
}
} ~
(get & path(Segment)) { r =>
complete {
"get"
}
}
如果我发布到日志端点,我的 get 端点也会卡在等待日志端点指定的 5 秒。
这是预期的行为,如果是,我如何在不阻塞整个 API 的情况下进行阻塞操作?
最佳答案
你观察到的是预期的行为——当然,这是非常糟糕的。好的,已知的解决方案和最佳实践存在以防止它。在这个答案中,我想花一些时间来短、长和深入地解释这个问题——享受阅读吧!
简答 :“不要阻塞路由基础设施!”,始终使用专用调度程序进行阻塞操作!
观察到的症状的原因:问题是您正在使用 context.dispatcher
作为调度员,阻塞 future 继续执行。路由基础设施使用相同的调度程序(简单来说只是“一堆线程”)来实际处理传入的请求——因此,如果您阻止所有可用线程,最终会使路由基础设施挨饿。 (辩论和基准测试的一个问题是,如果 Akka HTTP 可以防止这种情况发生,我会将其添加到我的研究待办事项列表中)。
必须特别小心地处理阻塞,以免影响同一调度程序的其他用户(这就是为什么我们将执行分离到不同的用户如此简单的原因),如 Akka 文档部分所述:Blocking needs careful management .
我想在这里提请注意的另一件事是,如果可能的话,应该完全避免阻塞 API - 如果您长时间运行的操作实际上不是一个操作,而是一系列操作,则您可以将它们分离到不同的参与者或序列化的 future 。无论如何,只是想指出——如果可能,避免这种阻塞调用,但如果你必须——那么下面解释了如何正确处理这些调用。
深度分析及解决方案 :
现在我们知道出了什么问题,从概念上讲,让我们看看上面代码中到底有什么问题,以及这个问题的正确解决方案是怎样的:
颜色 = 线程状态:
现在让我们研究 3 段代码以及它们如何影响调度程序和应用程序的性能。为了强制这种行为,应用程序被置于以下负载下:
1) [bad]
调度程序对错误代码的行为 :// BAD! (due to the blocking in Future):
implicit val defaultDispatcher = system.dispatcher
val routes: Route = post {
complete {
Future { // uses defaultDispatcher
Thread.sleep(5000) // will block on the default dispatcher,
System.currentTimeMillis().toString // starving the routing infra
}
}
}
所以我们将我们的应用程序暴露给 [a] 负载,你可以看到许多 akka.actor.default-dispatcher 线程已经 - 他们正在处理请求 - 绿色小片段,橙色意味着其他人实际上在那里闲置。
然后我们开始 [b] 加载,这会导致这些线程阻塞——你可以看到一个早期的线程“default-dispatcher-2,3,4”在之前空闲后进入阻塞状态。我们还观察到池在增长——新线程开始“default-dispatcher-18,19,20,21...”但是它们立即进入休眠状态(!)——我们在这里浪费了宝贵的资源!
此类启动线程的数量取决于默认调度程序配置,但可能不会超过 50 个左右。由于我们刚刚启动了 2k 个阻塞操作,我们使整个线程池饿死了——阻塞操作占主导地位,以至于路由基础设施没有可用的线程来处理其他请求——非常糟糕!
让我们做点什么(顺便说一句,这是 Akka 的最佳实践——始终隔离阻塞行为,如下所示):
2) [good!]
调度员行为良好的结构化代码/调度员 :在您的
application.conf
配置这个专用于阻塞行为的调度程序:my-blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
// in Akka previous to 2.4.2:
core-pool-size-min = 16
core-pool-size-max = 16
max-pool-size-min = 16
max-pool-size-max = 16
// or in Akka 2.4.2+
fixed-pool-size = 16
}
throughput = 100
}
您应该在 Akka Dispatchers 中阅读更多信息文档,以了解此处的各种选项。主要的一点是我们选择了一个
ThreadPoolExecutor
它对阻塞操作保持可用的线程数有一个硬限制。大小设置取决于您的应用程序的功能以及您的服务器拥有的核心数。接下来我们需要使用它,而不是默认的:
// GOOD (due to the blocking in Future):
implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")
val routes: Route = post {
complete {
Future { // uses the good "blocking dispatcher" that we configured,
// instead of the default dispatcher – the blocking is isolated.
Thread.sleep(5000)
System.currentTimeMillis().toString
}
}
}
我们使用相同的负载对应用程序施加压力,首先是一些正常的请求,然后我们添加阻塞的请求。这是 ThreadPools 在这种情况下的行为方式:
所以最初正常的请求很容易被默认的调度程序处理,你可以在那里看到几条绿线 - 这是实际执行(我并没有真正将服务器置于重负载下,所以它大多处于空闲状态)。
现在,当我们开始发出阻塞操作时,
my-blocking-dispatcher-*
启动,并启动到配置的线程数。它处理所有的 Sleeping 在那里。此外,在这些线程上一段时间没有发生任何事情后,它会关闭它们。如果我们用另一组阻塞来攻击服务器,池将启动新线程来处理 sleep()-ing 它们,但与此同时 - 我们不会浪费我们宝贵的线程“只是呆在那里没做什么”。使用此设置时,正常 GET 请求的吞吐量没有受到影响,它们仍然很高兴地在(仍然非常免费)默认调度程序上提供服务。
这是处理响应式(Reactive)应用程序中任何类型的阻塞的推荐方法。它通常被称为“隔板”(或“隔离”)应用程序的不良行为部分,在这种情况下,不良行为是 sleep /阻塞。
3) [workaround-ish]
blocking
时的调度员行为正确应用 :在本例中,我们使用 scaladoc for
scala.concurrent.blocking
在遇到阻塞操作时可以提供帮助的方法。它通常会导致更多的线程被启动以在阻塞操作中幸存下来。// OK, default dispatcher but we'll use `blocking`
implicit val dispatcher = system.dispatcher
val routes: Route = post {
complete {
Future { // uses the default dispatcher (it's a Fork-Join Pool)
blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat,
// but at the cost of exploding the number of threads (which eventually
// may also lead to starvation problems, but on a different layer)
Thread.sleep(5000)
System.currentTimeMillis().toString
}
}
}
}
该应用程序的行为如下:
你会注意到创建了很多新线程,这是因为阻塞提示“哦,这会阻塞,所以我们需要更多线程”。这导致我们被阻塞的总时间比 1) 示例中的要小,但是在阻塞操作完成后我们有数百个线程什么都不做……当然,它们最终会被关闭(FJP 会这样做) ),但有一段时间我们将有大量(不受控制的)线程在运行,与 2) 解决方案相比,我们确切地知道有多少线程专用于阻塞行为。
总结 :永远不要阻止默认调度程序:-)
最佳实践是使用 中所示的模式。
2)
, 为可用的阻塞操作提供一个调度程序,并在那里执行它们。希望这会有所帮助,哈克快乐!
讨论 Akka HTTP 版本 :
2.0.1
使用的分析器:很多人私下里问我,我用什么分析器来可视化上面图片中的线程状态,所以在此处添加此信息:我使用了 YourKit这是一个很棒的商业分析器(OSS 免费),尽管您可以使用免费的 VisualVM from OpenJDK 获得相同的结果。 .
关于scala - Akka HTTP : Blocking in a future blocks the server,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34641861/