scala - 跳过无限流中的错误

标签 scala functional-programming scala-cats fs2

我有一个无限fs2.Stream这可能会遇到错误。我想跳过这些错误而不做任何事情(可能是日志)并继续流式传输更多元素。例子:

//An example
val stream = fs2.Stream
    .awakeEvery[IO](1.second)
    .evalMap(_ => IO.raiseError(new RuntimeException))
在这种特定情况下,我想获得无限 fs2.StreamLeft(new RuntimeException)每秒发射一次。
有一个Stream.attempt产生在遇到第一个错误后终止的流的方法。有没有办法跳过错误并继续拉取更多元素?IO.raiseError(new RuntimeException).attempt通常不会起作用,因为它需要在流管道组合的所有位置尝试所有效果。

最佳答案

没有办法以您描述的方式处理错误。
当流遇到第一个错误时,它被终止。请检查此 gitter question .
您可以通过两种方式处理它:

  • 尝试效果(但您已经提到在您的情况下这是不可能的)。
  • 流终止后重新启动流:
  • val stream: Stream[IO, Either[Throwable, Unit]] = Stream
        .awakeEvery[IO](1.second)
        .evalMap(_ => IO.raiseError(new RuntimeException))
        .handleErrorWith(t => Stream(Left(t)) ++ stream) //put Left to the stream and restart it
    
    //since stream will infinetelly restart I take only 3 first values
    println(stream.take(3).compile.toList.unsafeRunSync()) 
    

    关于scala - 跳过无限流中的错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62904308/

    相关文章:

    scala - Scala 中的惯用 Haskell 式迭代?

    scala - Akka SLF4J 和 Scala 中的 logback

    用于分组的 Scala 集合,同时保持顺序

    scala - 为什么可以在其定义范围之外看到对象的 protected 内部类?

    scala - 函数式根据 Scala 中的值创建列表

    functional-programming - `abstract` 中的 `interface definition` 是什么意思?

    scala - 如何为scala子类型实现流畅的接口(interface)?

    java - RxJava : how to compose multiple Observables with dependencies and collect all results at the end?

    scala - 猫效应: Avoid calling unsafeRun on external listener of T => Unit