scala - 一个轻量级的 Scala fork join 语法

标签 scala parallel-processing fork-join

尽管即将推出 Java 7 标准 fork/join 框架,但我正在构建一些语法轻量级的辅助方法,供客户端并行运行代码。 这是一个可运行的主要方法来说明这个想法。

import actors.Futures

object ForkTest2 {



  def main(args: Array[String]) {
    test1
    test2
  }



  def test1 {
    val (a, b, c) =fork({
      Thread.sleep(500)
      println("inside fx1 ",+System.currentTimeMillis)
      true
    }, {
      Thread.sleep(1000)
      println("inside fx2 ",+System.currentTimeMillis)
      "stringResult"
    }, {
      Thread.sleep(1500)
      println("inside fx3 ",+System.currentTimeMillis)
      1
    })

    println(b, a, c)
    true
  }

  def test2 {
    val results = forkAll({
      () =>
              Thread.sleep(500)
              println("inside fx1 ",+System.currentTimeMillis)
              true
    }, {
      () =>
              Thread.sleep(1000)
              println("inside fx2 ",+System.currentTimeMillis)
              "stringResult"
    }, {
      () =>
              Thread.sleep(1500)
              println("inside fx3 ",+System.currentTimeMillis)
              1
    }, {
      () =>
              Thread.sleep(2000)
              println("inside fx4 ",+System.currentTimeMillis)
              1.023
    })

    println(results)
    true
  }

  val tenMinutes = 1000 * 60 * 10

  def fork[A, B, C](
          fx1: => A,
          fx2: => B,
          fx3: => C
          ) = {
    val re1 = Futures.future(fx1)
    val re2 = Futures.future(fx2)
    val re3 = Futures.future(fx3)
    //default wait 10 minutes
    val result = Futures.awaitAll(tenMinutes, re1, re2, re3)
    (
            result(0).asInstanceOf[Option[A]],
            result(1).asInstanceOf[Option[B]],
            result(2).asInstanceOf[Option[C]]

            )
  }

  type fxAny = () => Any

  def forkAll(
          fx1: fxAny*
          ): List[Any] = {
    val results = fx1.toList.map {fx: fxAny => Futures.future(fx())}
    Futures.awaitAll(tenMinutes, results: _*)
  }
}

样本输出是

(inside fx1 ,1263804802301)
(inside fx2 ,1263804802801)
(inside fx3 ,1263804803301)
(Some(stringResult),Some(true),Some(1))
(inside fx1 ,1263804803818)
(inside fx2 ,1263804804318)
(inside fx3 ,1263804804818)
(inside fx4 ,1263804805318)
List(Some(true), Some(stringResult), Some(1), Some(1.023))

测试 1 说明了类型安全的返回类型

测试 2 说明了任意输入参数

我希望将这两种测试方法结合起来,以便客户端代码可以并行运行具有类型安全返回类型的任意函数。

关于任意函数参数的另一点是:

我觉得行

  type fxAny = () => Any

应该真的代码为

  type fxAny =  => Any

,但 scala 编译器不允许我这样做。

感谢任何帮助。

最佳答案

Eric Torreborre 在@retronym 提供的链接中写道:

trait LazyParameters { 
  /** transform a value to a zero-arg function returning that value */ 
  implicit def toLazyParameter[T](value: =>T) = new LazyParameter(() => value) 
  /** class holding a value to be evaluated lazily */ 
  class LazyParameter[T](value: ()=>T) { 
    lazy val v = value() 
    def apply() = v 
  } 
} 

这是测试的 LazyParameter 版本:

object ForkTest2 extends LazyParameters {

...

def forkAll(fx1: LazyParameter[Any]*): List[Any] = {
  val results = fx1.toList.map {
    fx: LazyParameter[Any] => Futures.future(fx.apply())}
  Futures.awaitAll(tenMinutes, results: _*)
}

编辑:正如您所注意到的,隐式计算 by-name 参数并且它不会延续计算延迟。为什么不直接使用 future 这个词呢?我个人认为它使代码更具可读性。

import actors.Futures
import actors.Futures.future
import actors.Future

...

def test2 {
  val results = forkAll(
    future {
      Thread.sleep(500)
      println("inside fx1 ",+System.currentTimeMillis)
      true
    },
    future {
      Thread.sleep(1000)
      println("inside fx2 ",+System.currentTimeMillis)
      "stringResult"
    },
    future {
      Thread.sleep(1500)
      println("inside fx3 ",+System.currentTimeMillis)
      1
    },
    future {
      Thread.sleep(2000)
      println("inside fx4 ",+System.currentTimeMillis)
      1.023
    })

  println(results)
  true
}

...

def forkAll(futures: Future[Any]*): List[Any] = {
  println("forkAll")
  Futures.awaitAll(tenMinutes, futures: _*)
}

关于scala - 一个轻量级的 Scala fork join 语法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2084813/

相关文章:

scala - 使用线程局部变量的 Actor 回复会导致错误吗?

c++ - 我可以将 std::transform 与并行执行策略一起使用吗?

scala - 测试 sbt 没有正确读取 java.library.path

node.js - 我们可以在带有工作线程的 NodeJs 中使用多个 cpu 内核实现并行处理吗?

java - 将 Fluent 构建器转换为并行运行

angular - 如何取消订阅 forkJoin 返回的 Observable?

java - 在 Java 中使用 Executor 编写文件

java - 如何使用 Java 8 的 Fork/Join 框架并行化循环

java - 在 AWS 上使用 Kubernetes 创建并运行 docker 容器集群

java - Flyway迁移(多项目): run SBT command for main module,但不是子模块