scala - Akka HTTP : Blocking in a future blocks the server

标签 scala akka future akka-http

我正在尝试使用 Akka HTTP 对我的请求进行基本身份验证。
碰巧我有一个外部资源要通过身份验证,所以我必须对这个资源进行休息调用。

这需要一些时间,在处理过程中,我的 API 的其余部分似乎被阻塞,等待这个调用。
我用一个非常简单的例子重现了这个:

// used dispatcher:
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()


val routes = 
  (post & entity(as[String])) { e =>
    complete {
      Future{
        Thread.sleep(5000)
        e
      }
    }
  } ~
  (get & path(Segment)) { r =>
    complete {
      "get"
    }
  }

如果我发布到日志端点,我的 get 端点也会卡在等待日志端点指定的 5 秒。

这是预期的行为,如果是,我如何在不阻塞整个 API 的情况下进行阻塞操作?

最佳答案

你观察到的是预期的行为——当然,这是非常糟糕的。好的,已知的解决方案和最佳实践存在以防止它。在这个答案中,我想花一些时间来短、长和深入地解释这个问题——享受阅读吧!

简答 :“不要阻塞路由基础设施!”,始终使用专用调度程序进行阻塞操作!

观察到的症状的原因:问题是您正在使用 context.dispatcher作为调度员,阻塞 future 继续执行。路由基础设施使用相同的调度程序(简单来说只是“一堆线程”)来实际处理传入的请求——因此,如果您阻止所有可用线程,最终会使路由基础设施挨饿。 (辩论和基准测试的一个问题是,如果 Akka HTTP 可以防止这种情况发生,我会将其添加到我的研究待办事项列表中)。

必须特别小心地处理阻塞,以免影响同一调度程序的其他用户(这就是为什么我们将执行分离到不同的用户如此简单的原因),如 Akka 文档部分所述:Blocking needs careful management .

我想在这里提请注意的另一件事是,如果可能的话,应该完全避免阻塞 API - 如果您长时间运行的操作实际上不是一个操作,而是一系列操作,则您可以将它们分离到不同的参与者或序列化的 future 。无论如何,只是想指出——如果可能,避免这种阻塞调用,但如果你必须——那么下面解释了如何正确处理这些调用。

深度分析及解决方案 :

现在我们知道出了什么问题,从概念上讲,让我们看看上面代码中到底有什么问题,以及这个问题的正确解决方案是怎样的:

颜色 = 线程状态:

  • 绿松石 – sleep
  • 橙色 - 等待
  • 绿色 - RUNNABLE

  • 现在让我们研究 3 段代码以及它们如何影响调度程序和应用程序的性能。为了强制这种行为,应用程序被置于以下负载下:
  • [a] 继续请求 GET 请求(请参阅上面最初问题中的代码),它不会在那里阻塞
  • [b] 然后在一段时间后触发 2000 个 POST 请求,这将导致在返回 future 之前的 5 秒阻塞

  • 1) [bad]调度程序对错误代码的行为 :
    // BAD! (due to the blocking in Future):
    implicit val defaultDispatcher = system.dispatcher
    
    val routes: Route = post { 
      complete {
        Future { // uses defaultDispatcher
          Thread.sleep(5000)                    // will block on the default dispatcher,
          System.currentTimeMillis().toString   // starving the routing infra
        }
      }
    }
    

    所以我们将我们的应用程序暴露给 [a] 负载,你可以看到许多 akka.actor.default-dispatcher 线程已经 - 他们正在处理请求 - 绿色小片段,橙色意味着其他人实际上在那里闲置。

    blocking is killing the default dispatcher

    然后我们开始 [b] 加载,这会导致这些线程阻塞——你可以看到一个早期的线程“default-dispatcher-2,3,4”在之前空闲后进入阻塞状态。我们还观察到池在增长——新线程开始“default-dispatcher-18,19,20,21...”但是它们立即进入休眠状态(!)——我们在这里浪费了宝贵的资源!

    此类启动线程的数量取决于默认调度程序配置,但可能不会超过 50 个左右。由于我们刚刚启动了 2k 个阻塞操作,我们使整个线程池饿死了——阻塞操作占主导地位,以至于路由基础设施没有可用的线程来处理其他请求——非常糟糕!

    让我们做点什么(顺便说一句,这是 Akka 的最佳实践——始终隔离阻塞行为,如下所示):

    2) [good!]调度员行为良好的结构化代码/调度员 :

    在您的 application.conf配置这个专用于阻塞行为的调度程序:
    my-blocking-dispatcher {
      type = Dispatcher
      executor = "thread-pool-executor"
      thread-pool-executor {
        // in Akka previous to 2.4.2:
        core-pool-size-min = 16
        core-pool-size-max = 16
        max-pool-size-min = 16
        max-pool-size-max = 16
        // or in Akka 2.4.2+
        fixed-pool-size = 16
      }
      throughput = 100
    }
    

    您应该在 Akka Dispatchers 中阅读更多信息文档,以了解此处的各种选项。主要的一点是我们选择了一个 ThreadPoolExecutor它对阻塞操作保持可用的线程数有一个硬限制。大小设置取决于您的应用程序的功能以及您的服务器拥有的核心数。

    接下来我们需要使用它,而不是默认的:
    // GOOD (due to the blocking in Future):
    implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")
    
    val routes: Route = post { 
      complete {
        Future { // uses the good "blocking dispatcher" that we configured, 
                 // instead of the default dispatcher – the blocking is isolated.
          Thread.sleep(5000)
          System.currentTimeMillis().toString
        }
      }
    }
    

    我们使用相同的负载对应用程序施加压力,首先是一些正常的请求,然后我们添加阻塞的请求。这是 ThreadPools 在这种情况下的行为方式:

    the blocking pool scales to our needs

    所以最初正常的请求很容易被默认的调度程序处理,你可以在那里看到几条绿线 - 这是实际执行(我并没有真正将服务器置于重负载下,所以它大多处于空闲状态)。

    现在,当我们开始发出阻塞操作时,my-blocking-dispatcher-*启动,并启动到配置的线程数。它处理所有的 Sleeping 在那里。此外,在这些线程上一段时间没有发生任何事情后,它会关闭它们。如果我们用另一组阻塞来攻击服务器,池将启动新线程来处理 sleep()-ing 它们,但与此同时 - 我们不会浪费我们宝贵的线程“只是呆在那里没做什么”。

    使用此设置时,正常 GET 请求的吞吐量没有受到影响,它们仍然很高兴地在(仍然非常免费)默认调度程序上提供服务。

    这是处理响应式(Reactive)应用程序中任何类型的阻塞的推荐方法。它通常被称为“隔板”(或“隔离”)应用程序的不良行为部分,在这种情况下,不良行为是 sleep /阻塞。

    3) [workaround-ish] blocking 时的调度员行为正确应用 :

    在本例中,我们使用 scaladoc for scala.concurrent.blocking 在遇到阻塞操作时可以提供帮助的方法。它通常会导致更多的线程被启动以在阻塞操作中幸存下来。
    // OK, default dispatcher but we'll use `blocking`
    implicit val dispatcher = system.dispatcher
    
    val routes: Route = post { 
      complete {
        Future { // uses the default dispatcher (it's a Fork-Join Pool)
          blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat, 
                     // but at the cost of exploding the number of threads (which eventually
                     // may also lead to starvation problems, but on a different layer)
            Thread.sleep(5000)
            System.currentTimeMillis().toString
           }
        }
      }
    }
    

    该应用程序的行为如下:

    blocking causes more threads to be started

    你会注意到创建了很多新线程,这是因为阻塞提示“哦,这会阻塞,所以我们需要更多线程”。这导致我们被阻塞的总时间比 1) 示例中的要小,但是在阻塞操作完成后我们有数百个线程什么都不做……当然,它们最终会被关闭(FJP 会这样做) ),但有一段时间我们将有大量(不受控制的)线程在运行,与 2) 解决方案相比,我们确切地知道有多少线程专用于阻塞行为。

    总结 :永远不要阻止默认调度程序:-)

    最佳实践是使用 中所示的模式。 2) , 为可用的阻塞操作提供一个调度程序,并在那里执行它们。

    希望这会有所帮助,哈克快乐!

    讨论 Akka HTTP 版本 :2.0.1
    使用的分析器:很多人私下里问我,我用什么分析器来可视化上面图片中的线程状态,所以在此处添加此信息:我使用了 YourKit这是一个很棒的商业分析器(OSS 免费),尽管您可以使用免费的 VisualVM from OpenJDK 获得相同的结果。 .

    关于scala - Akka HTTP : Blocking in a future blocks the server,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34641861/

    相关文章:

    Java 与 Scala 语法比较

    java - 知道akka actor存在的三种方法

    scala - Akka future - 并行与并发?

    scala - 家庭多态性+混合?

    Scala:如何找到超过 2 个元素的最小值?

    java - AKKA .conf 文件配置到 .properties 文件

    scala - 将 futures 与 actor 消息混合时确保测试中的消息顺序

    flutter - 'Future<String>' 的实例而不是显示值

    asynchronous - 如何优雅地关闭 Tokio 运行时以响应 SIGTERM?

    scala - 提交失败异常 : Commit cannot be completed since the group has already rebalanced