scala - Monix Task.sleep 和单线程执行

标签 scala monix

我正在尝试理解 Monix 中的任务调度原则。 以下代码(来源:https://slides.com/avasil/fp-concurrency-scalamatsuri2019#/4/3)按预期仅生成“1”。

  val s1: Scheduler = Scheduler(
    ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()),
    ExecutionModel.SynchronousExecution)

  def repeat(id: Int): Task[Unit] =
    Task(println(s"$id ${Thread.currentThread().getName}")) >> repeat(id)

  val prog: Task[(Unit, Unit)] = (repeat(1), repeat(2)).parTupled

  prog.runToFuture(s1)

  // Output:
  // 1 pool-1-thread-1
  // 1 pool-1-thread-1
  // 1 pool-1-thread-1
  // ...

当我们在repeat方法中添加Task.sleep

  def repeat(id: Int): Task[Unit] =
    Task(println(s"$id ${Thread.currentThread().getName}")) >>
      Task.sleep(1.millis) >> repeat(id)

输出变为

// Output
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// ...

这两个任务现在在一个线程上同时执行!好的 :) 一些合作屈服已经开始。这里究竟发生了什么?谢谢:)

编辑:Task.shift 而不是 Task.sleep

最佳答案

我不确定这是否是您正在寻找的答案,但它是这样的:

尽管命名方式不同,Task.sleep 无法与更传统的方法(如 Thread.sleep)进行比较。

Task.sleep 实际上并不在线程上运行,而是简单地指示调度程序在耗时后运行回调。

这是来自 monix/TaskSleep.scala 的一小段代码比较:

[...]

implicit val s = ctx.scheduler
val c = TaskConnectionRef()
ctx.connection.push(c.cancel)

c := ctx.scheduler.scheduleOnce(
  timespan.length,
  timespan.unit,
  new SleepRunnable(ctx, cb)
)

[...]

private final class SleepRunnable(ctx: Context, cb: Callback[Throwable, Unit]) extends Runnable {

  def run(): Unit = {
    ctx.connection.pop()
    // We had an async boundary, as we must reset the frame
    ctx.frameRef.reset()
    cb.onSuccess(())
  }
}

[...]

在执行回调(此处:cb)之前,您的单线程调度程序(此处:ctx.scheduler)可以简单地使用他的线程进行任何操作计算排在下一个队列中。

这也解释了为什么这种方法更可取,因为我们不会在 sleep 间隔期间阻塞线程 - 浪费更少的计算周期。

希望这对您有所帮助。

关于scala - Monix Task.sleep 和单线程执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57426373/

相关文章:

scala - scala编译出错,为什么是: val num =123;println(num. getClass())

scala - 在 Spark-Streaming 和 Cassandra (Scala) 中使用 future

scala - 移位在复位 block 中的位置是否重要?

scala - 复制 Spark Row N 次

java - Scala 任务返回映射

monix - 将 Observable 写入文件

scala - 密封特性中的方法在类文件中给出重复的字段名称和签名

scala - 使用流建模多个函数调用(以安全的 FP 方式)

scala - Monix 并行任务上的错误处理(使用 parMap)

scala - 如何处理 monix onErrorHandle 中抛出的未处理异常