Scala Cats效果-IO异步移位-如何工作?

标签 scala functional-programming monads scala-cats io-monad

这是一些使用IO Monad的Scala猫代码:

import java.util.concurrent.{ExecutorService, Executors}

import cats.effect.IO

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.control.NonFatal

object Program extends App {

  type CallbackType = (Either[Throwable, Unit]) => Unit

  // IO.async[Unit] is like a Future that returns Unit on completion.
  // Unlike a regular Future, it doesn't start to run until unsafeRunSync is called.
  def forkAsync(toRun: () => Unit)(executor: ExecutorService): IO[Unit] = IO.async[Unit] { callback: CallbackType =>
    // "callback" is a function that either takes a throwable (Left) or whatever toRun returns (Right).
    println("LalalaAsync: " + Thread.currentThread().getName)
    executor.execute(new Runnable {
      def run(): Unit = {
        val nothing: Unit = toRun() // Note: This line executes the body and returns nothing, which is of type Unit.
        try {
          callback(Right(nothing)) // On success, the callback returns nothing
        } catch {
          case NonFatal(t) => callback(Left(t)) // On failure, it returns an exception
        }
      }
    })
  }

  def forkSync(toRun: () => Unit)(executor: ExecutorService): IO[Unit] = IO.apply {
    println("LalalaSync: " + Thread.currentThread().getName)
    executor.execute(new Runnable {
      def run(): Unit = {
        toRun()
      }
    })
  }

  val treadPool: ExecutorService = Executors.newSingleThreadExecutor()
  val mainThread: Thread = Thread.currentThread()

  val Global: ExecutionContextExecutor = ExecutionContext.global

  /*
  Output:
    1 Hello World printed synchronously from Main.main
    LalalaSync: scala-execution-context-global-12
    Hello World printed synchronously from thread pool.pool-1-thread-1
    LalalaAsync: scala-execution-context-global-12
    Hello World printed asynchronously from thread pool.pool-1-thread-1
    2 Hello World printed synchronously from Global .scala-execution-context-global-12
   */
  val program = for {
    _ <- IO {
      println("1 Hello World printed synchronously from Main." + Thread.currentThread().getName) // "main" thread
    }
    _ <- IO.shift(Global) // Shift to Global Execution Context
    _ <- forkSync { () =>
      println("Hello World printed synchronously from thread pool." + Thread.currentThread().getName) // "pool-1-thread-1" thread
    }(treadPool)
    _ <- forkAsync { () =>
      println("Hello World printed asynchronously from thread pool." + Thread.currentThread().getName) // "pool-1-thread-1" thread
    }(treadPool)
    _ <- IO.shift(Global) // Shift to Global Execution Context
    _ <- IO {
      println("2 Hello World printed synchronously from Global ." + Thread.currentThread().getName) // "scala-execution-context-global-13" thread
    }
  } yield ()

  program.unsafeRunSync()
}

要运行它,您需要添加:
libraryDependencies ++= Seq(
  "org.typelevel" %% "cats" % "0.9.0",
  "org.typelevel" %% "cats-effect" % "0.3"
),

到您的build.sbt文件。

注意输出:
  /*
  Output:
    1 Hello World printed synchronously from Main.main
    LalalaSync: scala-execution-context-global-12
    Hello World printed synchronously from thread pool.pool-1-thread-1
    LalalaAsync: scala-execution-context-global-12
    Hello World printed asynchronously from thread pool.pool-1-thread-1
    2 Hello World printed synchronously from Global .scala-execution-context-global-12
 */

基本上,我不了解IO.shift(Global)或IO.async的工作方式。

例如,为什么在我调用“forkAsync”之后,如果不调用“IO.shift(Global)”,则随后的同步IO对象将在“pool-1-thread-1”中运行。另外,在此示例中,forkAsync和forkSync有什么区别?它们都从ExecutionContext.global开始,然后在“pool.pool-1-thread-1”中执行Runnable。

就像forkAsync和forkSync做完全一样的事情还是forkAsync做不同的事情一样?如果他们在做同样的事情,那么在IO.async中包装代码有什么意义呢?如果他们做的不是同一件事,他们会有什么不同?

最佳答案

For example, why is it that after I call "forkAsync", if I don't call "IO.shift(Global)", the subsequent synchronous IO objects are run in "pool-1-thread-1".



更为重要的问题是,为什么您希望它在全局上评估“后续同步IO对象”?
IO在内部没有线程池的概念,也不了解global,因此它无法移回到默认的线程池,因此确实需要触发手动移位。

升级到最新版本的1.0.0,并且evalOn中也有ContextShift,它将在指定的线程池上执行IO操作,然后移回您的“全局”,我想这就是您想要的。

Also, what is the difference between forkAsync and forkSync in this example?



您的forkSync触发Runnable的执行,但不等待其完成。这是一场大火,忘记了。这意味着后续的链式 Action 将不会产生反压力。

一些忠告:
  • 升级到最新版本(1.0.0)
  • 在以下位置阅读文档:https://typelevel.org/cats-effect/datatypes/io.html
  • 关于Scala Cats效果-IO异步移位-如何工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52437675/

    相关文章:

    hibernate - 哪个是适用于 Scala 和 PostgreSQL 的 Play 框架的最佳数据访问选项?

    java - 导致堆空间用尽的可能原因是什么?是否有避免它的最佳实践?

    Scala案例类禁止按名称调用参数?

    python - 为什么 python reduce() 对 `None` 元素的行为不同?

    performance - StateT over Reader 和 ReaderT over State 之间有什么显着区别吗?

    scala - Play Framework 测试助手需要隐式 `Materializer`

    .net - MailboxProcessor 性能问题

    function - 如何在 Haskell 中从 a -> IO b 生成 IO (a->b) 函数

    haskell - 'return' 关键字有什么特别之处

    scala - 在 Scala foreach 循环中赋值