multithreading - ExecutionContext.global和主线程之间的区别

标签 multithreading scala scala-cats

我有以下代码:

object KafkaApi {

  private implicit val main: ExecutionContextExecutor = ExecutionContext.global
  private val workers = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())

  def main(args: Array[String]) {

    foo.unsafeRunAsync(_ => ())
    //foo.unsafeRunSync()
    println("Hello")


  }

  def foo: IO[Unit] =
    for {
      _ <- IO {
        println(Thread.currentThread().getName)
      }
      _ <- IO.shift(workers)
      _ <- IO {
        println(Thread.currentThread().getName)
      }
      _ <- IO {
        println(Thread.currentThread().getName)
      }
      _ <- IO {
        println(Thread.currentThread().getName)
      }
      _ <- IO.shift(main)
      _ <- IO {
        println(Thread.currentThread().getName)
      }
      _ <- IO {
        println(Thread.currentThread().getName)
      }
      _ <- IO {
        println(Thread.currentThread().getName)
      }
    } yield ()
}

输出为:
main
Hello
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
scala-execution-context-global-14
scala-execution-context-global-14
scala-execution-context-global-14

main scala-execution-context-global-14 有什么区别?

如果这两个不同,如何恢复主线程?

最佳答案

Running the code above, why the application never get terminated?



这个额外的问题太大了,无法发表评论,因此我在这里回答。

事实是,在JVM中,所有Thread都分为“普通”和"daemon"线程。这里重要的是

The Java Virtual Machine exits when the only threads running are all daemon threads.



因此,如果您有任何正在运行的非守护进程Thread,则JVM会认为您的应用程序仍在工作,即使它实际上什么也不做(也许只是在等待一些输入)。 “主”线程显然是“普通”线程。由标准ExecutionContext.global创建的线程是守护程序,因此不会在主线程完成后阻止您的应用程序退出。由Java的Executors.newCachedThreadPool创建的线程是非守护程序,因此可以使应用程序保持 Activity 状态。有几种可能的解决方案:
  • 除了ExecutionContext外,请勿使用其他global,即完全不使用Executors.newCachedThreadPool。根据您的情况,这可能是您想要的,也可能不是。
  • 完成所有工作后,显式 shutdown 您的自定义ExecutorService。这里要小心,因为shutdown不会等待所有 Activity 任务完成。所以代码应该变成像

  • private val pool = Executors.newCachedThreadPool
    implicit private val workers = ExecutionContext.fromExecutor(pool)
    
    // do whatever you want with workers 
    
    
    // optionally wait for all the work to be done
    
    pool.shutdown()
    
  • 使用创建守护程序线程的自定义池。例如,您可以执行以下操作:

  • val workers = ExecutionContext.fromExecutor(Executors.newCachedThreadPool(new ThreadFactory {
      private val defaultDelegate = Executors.defaultThreadFactory()
    
      override def newThread(r: Runnable): Thread = {
        val t = defaultDelegate.newThread(r)
        //let the default factory do all the job and just override daemon-flag 
        t.setDaemon(true)
        t
      }
    }))
    

    恕我直言,#2和#3之间的主要权衡是便利性与正确性。在#3中,您不必考虑所有任务的完成位置,因此可以安全地调用shutdown,这很方便。代价是,如果由于某种原因您在执行其他所有任务之前判断错误并退出了“主”线程,您将不会知道任何错误,因为守护程序线程将被静默杀死。如果您使用#2并发生相同的错误,或者您未在该代码路径中调用shutdown,则您的应用将继续运行,或者您会在日志中看到一些警告,指出在有一些任务的情况下池已关闭进行中。因此,如果这只是一长串处理的​​中间步骤,出于某种原因需要自定义线程池,那么我可能会选择#3;但是如果这种并行执行是主要行为,那么我将采用更明确的#2方法。

    关于multithreading - ExecutionContext.global和主线程之间的区别,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47578313/

    相关文章:

    scala - 使用ProGuard缩小Scala

    scala - 如何在 spark RetryingBlockFetcher IOException 中设置 remoteHost

    scala - 扩展副产品的自然转化

    python - 为什么子线程无法访问flask_login中的current_user变量?

    java - 对必须手动中断的异步计算进行单元测试

    c++ - 如何阻止和唤醒 boost 线程?

    java - Hazelcast 分布式计算的基本概念

    scala - Scala 可以通过引用调用吗?

    scala - Monad 与 Future 的应用仿函数

    scala - Cats Scala 中的序列和遍历来映射类型