scala - Future map 和 foreach 的行为

标签 scala

我有以下代码,可以在 map 和 future 中打印当前线程名称

object ConcurrencyTest1 {
  def main(args: Array[String]) {
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.Future
    println("main thread: " + Thread.currentThread().getName)


    Future {
      println("future job: " + Thread.currentThread().getName)
      Thread.sleep(1000)
      10
    }.map { x => {
      println("map: " + Thread.currentThread().getName)
      Thread.sleep(1000)
      x * x
    }
    }.foreach { x => {
      Thread.sleep(1000)
      println("foreach: " + Thread.currentThread().getName)
      println(x)
    }
    }

    Thread.sleep(5000)
  }
}

输出为:

main thread: main
future job: ForkJoinPool-1-worker-5
map: ForkJoinPool-1-worker-5
foreach: ForkJoinPool-1-worker-5

从输出来看,future job、map 和 foreach 都在同一个线程中运行。 我想问这个结果是否是确定性的,即它总是会输出相同的结果。或者,它们可能在不同的线程中运行

最佳答案

我不认为这是有保证的......

但是,在您的情况下,我认为这是从 Thread.sleep 调用得出的(当它们完成时,等待处理的下一个 Future 要么是后续的 map/foreach;其他 Future 是已经在其他可用线程上启动)。

您可以通过删除 Thread.sleep 调用来看到这一点:

注意:为了获取更多数据,需要迭代多个参数。

object ConcurrencyTestNoSleep {
  def main(args: Array[String]) {
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.Future
    println("main thread: " + Thread.currentThread().getName)

    args.map { a =>
      Future {
        println(s"future job $a: " + Thread.currentThread().getName)
        // Thread.sleep(1)
        10
      }.map { x => {
        println(s"map $a: " + Thread.currentThread().getName)
        // Thread.sleep(1)
        x * x
      }
      }.foreach { x => {
        // Thread.sleep(1)
        println(s"foreach $a:" + Thread.currentThread().getName)
        // println(x)
      }
    }
  }
  Thread.sleep(5000)
  }
}

运行时:

@ ConcurrencyTestNoSleep.main(Array[String]("a", "b", "c", "d", "e", "f", "g") )
main thread: main
future job a: scala-execution-context-global-954
map a: scala-execution-context-global-954
foreach a:scala-execution-context-global-955
future job c: scala-execution-context-global-955
map c: scala-execution-context-global-955
foreach c:scala-execution-context-global-955
future job d: scala-execution-context-global-955
map d: scala-execution-context-global-955
foreach d:scala-execution-context-global-955
future job e: scala-execution-context-global-955
map e: scala-execution-context-global-955
foreach e:scala-execution-context-global-955
future job f: scala-execution-context-global-955
map f: scala-execution-context-global-955
foreach f:scala-execution-context-global-955
future job g: scala-execution-context-global-955
map g: scala-execution-context-global-955
foreach g:scala-execution-context-global-955
future job b: scala-execution-context-global-954
map b: scala-execution-context-global-954
foreach b:scala-execution-context-global-954

您可以在第三行看到,"a" 这次运行在两个不同的线程上。


但是,通过 sleep ,似乎确实重用了相同的线程:

object ConcurrencyTestSleep {
  def main(args: Array[String]) {
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.Future
    println("main thread: " + Thread.currentThread().getName)

    args.map { a =>
      Future {
        println(s"future job $a: " + Thread.currentThread().getName)
        Thread.sleep(1)
        10
      }.map { x => {
        println(s"map $a: " + Thread.currentThread().getName)
        Thread.sleep(1)
        x * x
      }
      }.foreach { x => {
        Thread.sleep(1)
        println(s"foreach $a:" + Thread.currentThread().getName)
        // println(x)
      }
    }
  }
  Thread.sleep(5000)
  }
}

这里每个参数使用相同的线程(在映射和 foreach 中):

@ ConcurrencyTestSleep.main(Array[String]("a", "b", "c", "d", "e", "f", "g") )
main thread: main
future job a: scala-execution-context-global-1162
map a: scala-execution-context-global-1162
future job c: scala-execution-context-global-1164
future job b: scala-execution-context-global-1163
future job d: scala-execution-context-global-1165
map b: scala-execution-context-global-1163
map c: scala-execution-context-global-1164
map d: scala-execution-context-global-1165
foreach a:scala-execution-context-global-1162
future job e: scala-execution-context-global-1162
foreach b:scala-execution-context-global-1163
foreach d:scala-execution-context-global-1165
future job g: scala-execution-context-global-1165
foreach c:scala-execution-context-global-1164
map e: scala-execution-context-global-1162
future job f: scala-execution-context-global-1163
map g: scala-execution-context-global-1165
map f: scala-execution-context-global-1163
foreach e:scala-execution-context-global-1162
foreach f:scala-execution-context-global-1162
foreach g:scala-execution-context-global-1165

也就是说,我不会依赖这个!

关于scala - Future map 和 foreach 的行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50284424/

相关文章:

scala - 为什么sortByKey的 Spark 这么慢?他们有其他选择吗?

scala - Akka 拦截具有可堆叠行为的接收

scala - 如何将 Map[String,Seq[String]] 转换为 Map[String,String]

scala - 如何实现 ScalaTest FunSuite 以避免样板 Spark 代码和导入隐式

scala - 无形 .toHList 的行为

java - 如何让 IntelliJ 识别 Play Framework *.scala.xml 模板

scala - Scala 中变量的返回类型

scala - Apache Spark 3 和向后兼容性?

java - Scala java 兼容性问题。

scala - 使用 Scala 读取类路径下的属性文件