java - 在REPL中的Scala中具有java.util.concurrent._的死锁

标签 java scala parallel-processing java.util.concurrent scala-repl

我在学习Paul Chiusano和Runar Bjanarson的著作“Scala中的函数编程”(第7章-纯函数并行性)时遇到了以下情况。

    package fpinscala.parallelism

    import java.util.concurrent._
    import language.implicitConversions


    object Par {
      type Par[A] = ExecutorService => Future[A]

      def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)

      def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a) // `unit` is represented as a function that returns a `UnitFuture`, which is a simple implementation of `Future` that just wraps a constant value. It doesn't use the `ExecutorService` at all. It's always done and can't be cancelled. Its `get` method simply returns the value that we gave it.

      private case class UnitFuture[A](get: A) extends Future[A] {
        def isDone = true
        def get(timeout: Long, units: TimeUnit) = get
        def isCancelled = false
        def cancel(evenIfRunning: Boolean): Boolean = false
      }

      def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] = // `map2` doesn't evaluate the call to `f` in a separate logical thread, in accord with our design choice of having `fork` be the sole function in the API for controlling parallelism. We can always do `fork(map2(a,b)(f))` if we want the evaluation of `f` to occur in a separate thread.
        (es: ExecutorService) => {
          val af = a(es)
          val bf = b(es)
          UnitFuture(f(af.get, bf.get)) // This implementation of `map2` does _not_ respect timeouts. It simply passes the `ExecutorService` on to both `Par` values, waits for the results of the Futures `af` and `bf`, applies `f` to them, and wraps them in a `UnitFuture`. In order to respect timeouts, we'd need a new `Future` implementation that records the amount of time spent evaluating `af`, then subtracts that time from the available time allocated for evaluating `bf`.
        }

      def fork[A](a: => Par[A]): Par[A] = // This is the simplest and most natural implementation of `fork`, but there are some problems with it--for one, the outer `Callable` will block waiting for the "inner" task to complete. Since this blocking occupies a thread in our thread pool, or whatever resource backs the `ExecutorService`, this implies that we're losing out on some potential parallelism. Essentially, we're using two threads when one should suffice. This is a symptom of a more serious problem with the implementation, and we will discuss this later in the chapter.
        es => es.submit(new Callable[A] {
          def call = a(es).get
        })

      def lazyUnit[A](a: => A): Par[A] = fork(unit(a))

 def equal[A](e: ExecutorService)(p: Par[A], p2: Par[A]): Boolean =
    p(e).get == p2(e).get

}

您可以在Github here上找到原始代码。有关java.util.concurrent文档,请参见here

我关注fork的实现。特别地,当ThreadPool太小时,据称fork可能导致死锁。

我考虑以下示例:
val a = Par.lazyUnit(42 + 1)
val es: ExecutorService = Executors.newFixedThreadPool(2)
println(Par.fork(a)(es).get)  

我不希望这个示例最终陷入僵局,因为有两个线程。但是,当我在Scala REPL中运行它时,它将在我的计算机上运行。为什么会这样呢?

初始化ExecutorService时的输出为
es:java.util.concurrent.ExecutorService =
java.util.concurrent.ThreadPoolE
xecutor@73a86d72[Running, pool size = 0, active threads = 0, queued tasks =
 0, completed tasks = 0]
pool size = 0在这里正确吗?换句话说,这是不了解java.util.concurrent._的问题还是不了解Scala部分的问题?

最佳答案

好吧,经过长时间的调查,我相信我会回答。完整的故事很长,但是我将尝试通过简化和避免许多细节来缩短它。

注意:可以将Scala编译为各种不同的目标平台,但是这个特定问题发生在以Java/JVM为目标的情况下,因此这就是此答案的内容。

您看到的死锁与线程池的大小无关。实际上是挂起的外部fork调用。它与REPL实现细节和多线程结合在一起,但是需要学习一些知识才能理解它是如何发生的:

  • Scala REPL如何工作
  • Scala如何将object编译为Java/JVM
  • Scala如何在Java/JVM上模拟名称参数
  • Java/JVM如何运行类
  • 的静态初始化程序

    一个简短的版本(另请参见摘要结尾)是该代码卡在REPL之下,因为在REPL执行该代码时,它在逻辑上类似于以下代码:

    object DeadLock {
    
      import scala.concurrent._
      import scala.concurrent.duration.Duration
      import scala.concurrent.ExecutionContext.Implicits.global
    
      val foo: Int = Await.result(Future(calc()), Duration.Inf)
    
      def printFoo(): Unit = {
        println(s"Foo = $foo")
      }
    
      private def calc(): Int = {
        println("Before calc")
        42
      }
    }
    
    
    def test(): Unit = {
      println("Before printFoo")
      DeadLock.printFoo()
      println("After printFoo")
    } 
    

    或在Java世界中非常相似:
    class Deadlock {
        static CompletableFuture<Integer> cf;
        static int foo;
    
        public static void printFoo() {
            System.out.println("Print foo " + foo);
        }
    
        static {
            cf = new CompletableFuture<Integer>();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    calcF();
                }
            }).start();
            try {
                foo = cf.get();
                System.out.println("Future result = " + cf.get());
            } catch (InterruptedException e) {
                e.printStackTrace();f
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    
    
        private static void calcF() {
            cf.complete(42);
        }
    }
    
    public static void main(String[] args) {
        System.out.println("Before foo");
        Deadlock.printFoo();
        System.out.println("After foo");
    }
    

    如果您清楚此代码为何会陷入僵局,那么您已经了解了大部分内容,并且可以自己推断出其余内容。您可能只需要看一下最后的摘要部分。

    Java静态初始化程序如何死锁?

    让我们从这个故事的结尾开始:为什么Java代码挂起?发生这种情况是因为Java/JVM对静态初始化程序有两个保证(有关更多详细信息,请参见JLS的12.4.2. Detailed Initialization Procedure部分):
  • 静态初始化程序将在对
  • 类的任何其他“外部”使用之前运行
  • 静态初始化程序将只运行一次,并通过全局锁定
  • 完成

    静态初始化程序使用的锁是隐式的,由JVM管理,但在那里。这意味着代码在逻辑上类似于以下内容:
    class Deadlock {
    
        static boolean staticInitFinished = false;
        // unique value for each thread!
        static ThreadLocal<Boolean> currentThreadRunsStaticInit = ThreadLocal.withInitial(() -> Boolean.FALSE);
    
    
        static CompletableFuture<Integer> cf;
        static int foo;
    
        static void enforceStaticInit() {
            synchronized (Deadlock.class) {
                // is init finished?
                if (staticInitFinished)
                    return;
                // are we the thread already running the init?
                if(currentThreadRunsStaticInit.get())
                    return;
                currentThreadRunsStaticInit.set(true);
    
                cf = new CompletableFuture<Integer>();
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        calcF();
                    }
                }).start();
                try {
                    foo = cf.get();
                    System.out.println("Future result = " + cf.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                currentThreadRunsStaticInit.set(false);
                staticInitFinished = true;
            }
        }
    
        private static void calcF() {
            enforceStaticInit();
            cf.complete(42);
        }
    
        public static void printFoo() {
            enforceStaticInit();
            System.out.println("Print foo " + foo);
        }
    }
    

    现在很清楚为什么此代码会死锁:我们的静态初始化程序启动一个新线程并阻止等待其结果。但是,该新线程尝试访问相同的类(calcF方法),并且作为另一个线程,它必须等待已经运行的静态初始化程序完成。请注意,如果calcF方法在另一个类中,那么一切都会正常进行。

    Scala REPL的工作方式

    现在让我们回到有关Scala REPL如何工作的故事的开始。这个答案是对实际交易的极大简化,但是它捕获了有关此情况的详细信息的重要信息。幸运的是,对于REPL实现者来说,Scala编译器是用Scala编写的。这意味着REPL不必以某种方式解释代码,它可以通过标准编译器运行,然后通过Java Reflection API运行已编译的代码。这仍然需要对代码进行一些修饰,以使编译器满意并返回结果。

    当您键入类似的内容时,可以稍微简化一下(或者很多)

    val a = Par.lazyUnit(42 + 1)
    

    到REPL中,对代码进行分析并将其转换为类似以下内容的代码:

    package line3
    
    object read {
        val a = Par.lazyUnit(42 + 1)
        val res3 = a
    }
    
    object eval {
        def print() = {
            println("a: Par.Par[Int] = " + read.res3)
        }
    }
    

    然后通过反射调用line3.eval.print()

    类似的故事发生在:

    val es: ExecutorService = Executors.newFixedThreadPool(2)
    

    最后当你这样做
    Par.fork(a)(es).get
    

    事情变得更加有趣了,因为您对前面的行有依赖性,可以使用import巧妙地实现它:

    package line5
    
    object read {
        import line2.read.Par
        import line3.read.a
        import line4.read.es
    
        val res5 = Par.fork(a)(es).get
    }
    
    object eval {
        def print() = {
            println("res5: Int = " + read.res5)
        }
    }
    

    在这里的重要之处在于,您写入REPL的所有内容都被包装到全新的object中,然后作为常规代码进行编译和运行。

    Scala如何在Java/JVM上模拟名称参数
    fork方法的定义使用by-name parameter:
    def fork[A](a: => Par[A]): Par[A] =
    

    在这里,它用于懒惰地评估a,这对于fork的整个逻辑至关重要。 Java/JVM不对延迟评估提供标准支持,但是可以对其进行仿真,这就是Scala编译器的作用。在内部将签名更改为使用Function0:

    def fork[A](aWrapper: () => Par[A]): Par[A] = 
    

    每次对a的访问都将替换为对aWrapper.apply()的调用。魔术的另一部分发生在带有by-name参数的方法的调用方:该参数也应该包装在Function0中,这样代码就变成了类似

    object read {
        import line2.read.Par
        import line3.read.a
        import line4.read.es
    
        val res5 = Par.fork(() => a)(es).get
    }
    

    但这实际上有点不同。天真的,只为这个小功能要花一个类,而对于这样一个简单的逻辑来说,这是浪费的。在Scala 2.12中的实践中,使用了Java 8 LambdaMetafactory的魔力,因此代码的确变成了类似

    object read {
        import line2.read.Par
        import line3.read.a
        import line4.read.es
    
        def aWrapper():Int = a
    
        val res5 = Par.fork(aWrapper _)(es).get
    }
    

    其中aWrapper _表示将方法转换为Funciton0完成的LambdaMetafactory。您可能会在Java静态初始化程序死锁一章中对此有所怀疑,def aWrapper的引入是的关键区别。您已经可以看到该代码与挂起的答案中的第一个Scala代码段非常相似。

    Scala如何在Java/JVM上编译object

    最后一个难题是如何在Java/JVM中编译Scala object。好吧,实际上它被编译为类似于“静态类”的东西,但是由于您可以将object用作对象参数,因此它必须稍微复杂一些。实际上,所有初始化逻辑都移至object类的构造函数,并且有一个简单的静态初始化程序对其进行调用。因此,我们在Java中最后一个read对象将(忽略import)如下所示:
    class read$ {
        static read$ MODULE$
    
        static {
            new read$()
        }
    
        private Par[Int] res5;
    
        private read$() {
            MODULE$ = this;
            res5 = Par.fork(read$::aWrapper)(es).get
        }
    
        private static int aWrapper(){
            return line3.read$.MODULE$.a;
        }
    }
    

    这里read$::aWrapper再次表示使用Function0aWrapper方法构建LambdaMetafactory。换句话说,Scala object
    初始化被转换为作为Java静态初始化程序 的一部分运行的代码。

    摘要

    总结一下如何弄糟:
  • REPL将您的代码转换为每行的新object并对其进行编译
  • object初始化逻辑被翻译成Java静态初始化逻辑
  • 在简单情况下,将带有名字参数的方法的
  • 调用转换为包装“返回值”逻辑的方法,并将该方法添加到相同的classobject

  • 作为Par.fork初始化的一部分(即Java静态初始化程序的一部分)执行的
  • object尝试在不同的线程上评估by-name参数(即,调用同一类的方法),并阻止等待结果的执行。该线程
  • Java静态初始化程序在逻辑上在全局锁下执行,因此它阻止了不同的线程调用该方法。但是它本身被阻止等待该方法调用完成。
  • 关于java - 在REPL中的Scala中具有java.util.concurrent._的死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54390881/

    相关文章:

    java - *.json.swp 的 com.fasterxml.jackson.core.JsonParseException?

    scala - 使用 CSRF 保护测试 scala Play (2.2.1) Controller

    java - Scala reduce 和 fold 返回 0 而不是一个值

    bash - 带有 "export -f <func>"的 BASH 脚本中的 GNU Parallel 在 Crond 时失败并出现 "Command Not Found"错误

    java - 如何在grails应用程序中创建一个新目录来存储用户可以下载的jasper报告文件

    java - AES key 是随机的吗?

    scala - lift 的 sbt unmanagedClasspath 条目已编译但在运行时未找到

    c++ - parallel_for_each 上下文中的数组复制

    c# - 并发流图

    java - java中无需编码即可快速将CLOB转换为String