java - 如何制作 block 感知执行上下文?

标签 java multithreading scala concurrency executioncontext

出于某种原因,我无法全神贯注地实现它。我有一个运行 Play 的应用程序呼唤Elastic Search .作为我设计的一部分,我的服务使用 Java API 与 scala future 一起包装,如本 blog post 所示。 .我已经更新了该帖子中的代码,以向 ExecutionContext 提示它将执行一些阻塞 I/O,如下所示:

    import scala.concurent.{blocking, Future, Promise}
    import org.elasticsearch.action.{ActionRequestBuilder, ActionListener, ActionResponse }
    def execute[RB <: ActionRequestBuilder[_, T, _, _]](request: RB): Future[T] = {
        blocking {
            request.execute(this)
            promise.future
        }
    }

我构建查询以发送到 ES 的实际服务将 executionContext 作为构造函数参数,然后将其用于调用 Elasticsearch 。我这样做是为了让播放使用的全局执行上下文不会让它的线程被对 ES 的阻塞调用所束缚。 This S.O. comment提到只有全局上下文是阻塞感知的,所以这让我不得不创建自己的。在同一篇文章/答案中,有很多关于使用 ForkJoin 池的信息,但我不确定如何获取 those docs 中所写的内容。并将其与 hints in the blocking documentation 结合起来创建响应阻塞提示的执行上下文。

我认为我遇到的问题之一是我不确定首先如何响应阻塞上下文?我正在阅读 best practices它使用的示例是一个无限制的线程缓存:

Note that here I prefer to use an unbounded "cached thread-pool", so it doesn't have a limit. When doing blocking I/O the idea is that you've got to have enough threads that you can block. But if unbounded is too much, depending on use-case, you can later fine-tune it, the idea with this sample being that you get the ball rolling.

那么这是否意味着使用我的 ForkJoin 支持的线程池,我应该在处理非阻塞 I/O 时尝试使用缓存线程并为阻塞 IO 创建一个新线程?或者是其他东西?我在网上找到的几乎所有关于使用单独线程池的资源都倾向于做 Neophytes guide does 的事情。 ,也就是说:

How to tune your various thread pools is highly dependent on your individual application and beyond the scope of this article.

我知道这取决于您的应用程序,但在这种情况下,如果我只想创建某种类型的阻塞感知 ExecutionContext 并了解管理线程的合适策略。如果 Context 专门用于应用程序的单个部分,我是否应该只设置固定的线程池大小,而不是一开始就使用/忽略 blocking 关键字?

我倾向于漫无边际,所以我会尝试在答案中分解我正在寻找的内容:

  1. 代码!阅读所有这些文档仍然让我感觉自己无法编写阻塞感知上下文的代码,我真的很感激一个例子。
  2. 关于如何处理阻塞线程的任何链接或提示,即无休止地为它们创建一个新线程,检查可用线程的数量,如果太多则拒绝,一些其他策略
  3. 我不是在这里寻找性能提示,我知道我只能通过测试获得它,但如果我一开始就不知道如何对上下文进行编码,我就无法测试!我确实找到了 an example of ForkJoins vs threadpools但我错过了有关 blocking 的关键部分。

很抱歉这里的问题很长,我只是想让你了解我正在看什么,我已经花了一天多的时间来解决这个问题,需要一些外部帮助。


编辑:为了清楚起见,ElasticSearch 服务的构造函数签名是:

//Note that these are not implicit parameters!
class ElasticSearchService(otherParams ..., val executionContext: ExecutionContext)

在我的应用程序启动代码中,我有这样的东西:

object Global extends GlobalSettings {
    val elasticSearchContext = //Custom Context goes here
    ...
    val elasticSearchService = new ElasticSearchService(params, elasticSearchContext);
    ...
}

我也在通读Play's recommendations for contexts ,但还没有看到任何关于阻塞提示的信息,我怀疑我可能需要查看源代码以查看它们是否扩展了 BlockContext 特征。

最佳答案

所以我深入研究了文档和Play's best practices因为我正在处理的情况是

In certain circumstances, you may wish to dispatch work to other thread pools. This may include CPU heavy work, or IO work, such as database access. To do this, you should first create a thread pool, this can be done easily in Scala:

并提供一些代码:

object Contexts {
    implicit val myExecutionContext: ExecutionContext = Akka.system.dispatchers.lookup("my-context")
}

上下文来自 Akka,所以我跑到那里搜索他们提供的默认值和上下文类型,最终我找到了 documentation on dispatchers .默认值为 ForkJoinPool其管理 block 的默认方法是调用 managedBlock(blocker)。这让我阅读了说明的文档:

Blocks in accord with the given blocker. If the current thread is a ForkJoinWorkerThread, this method possibly arranges for a spare thread to be activated if necessary to ensure sufficient parallelism while the current thread is blocked.

所以看起来如果我有一个 ForkJoinWorkerThread 那么我想我想要的行为就会发生。进一步查看 ForkJoinPool 的源代码,我注意到默认的线程工厂是:

val defaultForkJoinWorkerThreadFactory: ForkJoinWorkerThreadFactory = juc.ForkJoinPool.defaultForkJoinWorkerThreadFactory

这对我来说意味着如果我在 Akka 中使用默认设置,我将获得一个以我期望的方式处理阻塞的上下文。

所以再次阅读 Akka 文档似乎指定我的上下文是这样的:

my-context {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 8
    parallelism-factor = 3.0
    parallelism-max = 64
    task-peeking-mode = "FIFO"
  }
  throughput = 100
}

会是我想要的。

当我在源代码中搜索时,我做了一些寻找 blocking 或调用 managedBlock 的用途,并在 ThreadPoolBuilder 中找到了覆盖 ForkJoin 行为的示例。

private[akka] class AkkaForkJoinWorkerThread(_pool: ForkJoinPool) extends ForkJoinWorkerThread(_pool) with BlockContext {
    override def blockOn[T](thunk: ⇒ T)(implicit permission: CanAwait): T = {
      val result = new AtomicReference[Option[T]](None)
      ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
        def block(): Boolean = {
          result.set(Some(thunk))
          true
        }
        def isReleasable = result.get.isDefined
      })
      result.get.get // Exception intended if None
    }
  }

似乎像我最初要求的作为如何制作实现 BlockContext 的东西的示例。该文件还包含显示如何制作 ExecutorServiceFactory 的代码,我相信这是 由配置的 executor 部分引用。所以我想如果我想拥有我会怎么做 一个完全自定义的上下文将扩展某种类型的 WorkerThread 并编写我自己的使用自定义工作线程的 ExecutorServiceFactory,然后在属性中指定完全限定的类名,如 this post advises .

我可能会使用 Akka 的 forkjoin :)

关于java - 如何制作 block 感知执行上下文?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35065496/

相关文章:

java - 在 jboss 7 上部署 java EE 应用程序

java - java中传递参数的通用可变个数

scala - 杜比。组合 .update.withGeneratedKeys() 和 .update.run

Scala:如何转义文字中的反引号?

java - 在 Android 上运行 libgdx 应用程序时出现异常

java - 在带有 POI 的 Excel 单元格中显示 0 而不是错误值

c++ - 文件线程

multithreading - 如何推理互斥体构成的正确性?

Java程序卡在特定线程上

scala - 如何在 Scala 中完全避免运行时反射?