scala - 将 Scala @suspendable 方法转换为 Future

标签 scala continuations

假设我有一个 sleep 功能:

def sleep(delay:Int) : Unit @suspendable = {
  ....
}

是否有可能有一个函数 future 来创建可以同步等待的 sleep 函数的异步版本。
def future(targetFunc: (Int => Unit @suspendable)) : (Int => Future) = {
    ....
}

class Future {
  def await : Unit @suspendable = {
     ....
  }
}

你应该能够做这样的事情:
reset {
  val sleepAsync = future(sleep)
  val future1 = sleepAsync(2000)
  val future2 = sleepAsync(3000)
  future1.await
  future2.await
  /* finishes after a delay of 3000 */
}

对 sleepAsync 的两次调用应该会立即返回,对 Future#await 的两次调用应该会出现阻塞。当然他们都真的在reset结束后掉了下来,后面的代码负责调用延迟后的continuation。

否则是否有另一种方法可以并行运行两个@suspendable 函数并等待它们都完成?

我有一个可编译的要点,其中包含我想做的事情的骨架:https://gist.github.com/1191381

最佳答案

object Forks {

  import scala.util.continuations._

  case class Forker(forks: Vector[() => Unit @suspendable]) {
    def ~(block: => Unit @suspendable): Forker = Forker(forks :+ (() => block))
    def joinIf(pred: Int => Boolean): Unit @suspendable = shift { k: (Unit => Unit) =>
      val counter = new java.util.concurrent.atomic.AtomicInteger(forks.size)
      forks foreach { f =>
        reset {
          f()
          if (pred(counter.decrementAndGet)) k()
        }
      }
    }
    def joinAll() = joinIf(_ == 0)
    def joinAny() = joinIf(_ == forks.size - 1)
  }

  def fork(block: => Unit @suspendable): Forker = Forker(Vector(() => block))
}

使用 fork(),我们现在可以等待许多“可暂停”。使用 ~() 将可悬浮物链接在一起。使用 joinAll() 等待所有 suspendable 并使用 joinAny() 仅等待一个。使用 joinIf() 自定义连接策略。
object Tests extends App {

  import java.util.{Timer, TimerTask}
  import scala.util.continuations._

  implicit val timer = new Timer

  def sleep(ms: Int)(implicit timer: Timer): Unit @suspendable = {
    shift { k: (Unit => Unit) =>
      timer.schedule(new TimerTask {
        def run = k()
      }, ms)
    }
  }

  import Forks._

  reset {
    fork {
      println("sleeping for 2000 ms")
      sleep(2000)
      println("slept for 2000 ms")
    } ~ {
      println("sleeping for 4000 ms")
      sleep(4000)
      println("slept for 4000 ms")
    } joinAll()
    println("and we are done")
  }
  println("outside reset")
  readLine
  timer.cancel
}

这是输出。程序在时间 T 开始:
sleeping for 2000 ms
sleeping for 4000 ms
outside reset         <<<<<< T + 0 second
slept for 2000 ms     <<<<<< T + 2 seconds
slept for 4000 ms     <<<<<< T + 4 seconds
and we are done       <<<<<< T + 4 seconds

关于scala - 将 Scala @suspendable 方法转换为 Future,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7294252/

相关文章:

haskell - 连续单子(monad)转变

ruby - 延续可以用来代替递归吗?

functional-programming - 理解(子、部分、完整、一次性)延续(程序语言)

方案早期 "short circuit return"?

scala - 带点 Spark 的列名

scala - StringOps 和 WrappedString 实例之间的相等性

scala - 从 playframework 2.0 向网络服务发出 GET 请求

foreach - for-each 中的方案延续

scala - 如何在 Circe 的 Scala 中使用泛型?

scala - [Scala][Spark] : transform a column in dataframe, 保留其他列,使用 withColumn 和 map [错误:缺少参数类型]