scala - FS2 流运行直到 InputStream 末尾

标签 scala functional-programming scala-cats fs2

我对 FS2 很陌生,需要一些有关设计的帮助。我正在尝试设计一个流,它将从底层的 InputStream 中提取 block ,直到结束。这是我尝试过的:

import java.io.{File, FileInputStream, InputStream}

import cats.effect.IO
import cats.effect.IO._

object Fs2 {

  def main(args: Array[String]): Unit = {
    val is = new FileInputStream(new File("/tmp/my-file.mf"))
    val stream = fs2.Stream.eval(read(is))
    stream.compile.drain.unsafeRunSync()
  }

  def read(is: InputStream): IO[Array[Byte]] = IO {
    val buf = new Array[Byte](4096)
    is.read(buf)
    println(new String(buf))
    buf
  }
}

程序打印唯一的第一个 block 。这是合理的。但我想找到一种方法来“指示”哪里停止阅读以及哪里不停止。我的意思是继续调用 read(is) 直到结束。有办法实现吗?

我也尝试过repeatEval(read(is)),但它一直在读取......我需要介于两者之间的东西。

最佳答案

使用fs2.io.readInputStreamfs2.io.readInputStreamAsync 。前者阻塞当前线程;后者会阻塞 ExecutionContext 中的线程。例如:

val is: InputStream = new FileInputStream(new File("/tmp/my-file.mf"))
val stream = fs2.io.readInputStreamAsync(IO(is), 128)

关于scala - FS2 流运行直到 InputStream 末尾,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50515613/

相关文章:

Scala - 元组上的 Future.sequence

scala - 使用 Akka actors 进行异常处理

scala - 在 Scala 函数中处理副作用和返回值的惯用方法

scala - 在猫中实现 while(true)

scala - 更新不可变对象(immutable对象)

scala - 如何将 Spark DataFrame 以 csv 格式保存在磁盘上?

Haskell 新手问题 : What's wrong with my append function?

java - 纯函数可以改变输入值吗?

javascript - Typescript/React 功能组件的 props 功能 - 什么类型?

scala - 使用cats.effect时,值flatMap不是类型参数F[Long]的成员