scala - 将元素从队列中取出时写入文件 : Scala fs2 Stream

标签 scala io queue fs2

我对 fs2 流、进程元素进行了一个小测试,等待然后将它们写入文件。 我收到类型错误提示,但我无法弄清楚它的含义:

错误:需要:fs2.Stream[[x]cats.effect.IO[x],Unit] => fs2.Stream[[+A]cats.effect.IO[A],Unit], 发现:[F[_]]fs2.Pipe[F,Byte,Unit] 导入 java.nio.file.Paths

import cats.effect.{Blocker, ExitCode, IO, IOApp, Timer}
import fs2.Stream
import fs2.io
import fs2.concurrent.Queue

import scala.concurrent.duration._
import scala.util.Random

class StreamTypeIntToDouble(q: Queue[IO, Int])(implicit timer: Timer[IO]) {
  import core.Processing._

  val blocker: Blocker =
    Blocker.liftExecutionContext(
      scala.concurrent.ExecutionContext.Implicits.global
    )
  def storeInQueue: Stream[IO, Unit] = {

    Stream(1, 2, 3)
      .covary[IO]
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
      .metered(Random.between(1, 20).seconds)
      .through(q.enqueue)

  }
  def getFromQueue: Stream[IO, Unit] = {
    q.dequeue
      .evalMap(n => IO.delay(println(s"Pulling from queue $n")))
      .through(
        io.file
          .writeAll(Paths.get("file.txt"), blocker)
      )

  }
}

object Five extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val program = for {
      q <- Queue.bounded[IO, Int](10)
      b = new StreamTypeIntToDouble(q)
      _ <- b.storeInQueue.compile.drain.start
      _ <- b.getFromQueue.compile.drain
    } yield ()
    program.as(ExitCode.Success)
  }
}

最佳答案

这里有几个问题,第一个是最令人困惑的。 writeAll 在其上下文 F[_] 中是多态的,但它需要 FContextShift 实例(以及作为同步)。您当前在范围内没有 ContextShift[IO],因此编译器不会推断 writeAllF 应该是 IO。如果你添加这样的东西:

implicit val ioContextShift: ContextShift[IO] =
  IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)

...然后编译器将按照您的预期推断IO

我对这种情况的建议是跳过类型推断。用类型参数写出来只是略微更冗长:

  .through(
    io.file
      .writeAll[IO](Paths.get("file.txt"), blocker)
  )

…这意味着您将获得有用的错误消息,例如缺少类型类实例。

一旦您解决了这个问题,就会出现其他几个问题。其次是在此上下文中使用 evalMap 意味着您将拥有一个 () 值流。如果将其更改为 evalTap,日志记录副作用仍会适当发生,但您不会丢失调用它的流的实际值。

最后一个问题是 writeAll 需要一个字节流,而您已经给它一个 Int 流。你想如何处理这种差异取决于预期的语义,但为了举例,像 .map(_.toByte) 这样的东西会编译。

关于scala - 将元素从队列中取出时写入文件 : Scala fs2 Stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62447670/

相关文章:

scala - 无法解析任意符号

scala - Scala 中的磁盘持久化延迟缓存列表™

scala - Spark 壳 : How to copy multiline inside?

performance - 读取未格式化文件的最有效方法

python - 如何创建内存文件对象

php - 异步运行 PHP 任务

我的数据库中的 Scala Slick 和复杂类型

c - 是否可以将事件文件描述符与中断驱动输入结合使用?

c# - 线程队列进程

ios - 为什么使用 dispatch_async 比完全不使用它要慢?