scala - FS2 Stream 异常处理不起作用

标签 scala exception-handling functional-programming fs2

我在 FS2 和异常处理方面遇到问题。我想要的是,给定一个 Stream[IO,A],当我使用可以抛出异常的 f: A => B 映射到它时,我得到一个Stream[IO,Either[Throwable,B]]

我尝试了以下方法,它按预期工作:

import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
  .attempt
x1.compile.toVector.unsafeRunSync().foreach(println)

它打印:

Right(1)
Right(4)
Left(java.lang.RuntimeException: I don't like 9s)

但是,当我尝试对该 Stream 执行任何操作时,我的问题就开始了。

val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
  .attempt.map(identity)

x1.compile.toVector.unsafeRunSync().foreach(println)

抛出异常并杀死应用程序:

java.lang.RuntimeException: I don't like 9s
    at swaps.fm.A$A32$A$A32.$anonfun$x1$2(tmp2.sc:7)
    at scala.runtime.java8.JFunction1$mcII$sp.apply(tmp2.sc:8)
    ...

更奇怪的是,使用 takeStream 只返回我认为没问题的元素,仍然以同样的方式爆炸:

val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
  .attempt.take(2)

x1.compile.toVector.unsafeRunSync().foreach(println)

有人能解释一下为什么会这样吗?这是错误还是(非)预期的行为?

注意 此行为存在于 FS2 0.10.0-M70.10.0

最佳答案

这里的问题是,要使用fs2,您必须编写纯代码。抛出异常不是纯粹的,因此如果您希望管道中的某个步骤可能失败,则需要将其明确化。这里有两种方式:

import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .map{ i => if(i == 9) Left[Throwable, Int](new RuntimeException("I don't like 9s")) else Right(i)}
x1.compile.toVector.unsafeRunSync().foreach(println)
// Explicit Left annotation is so you can .rethrow if desired; it can be omitted or added later with .widen

import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .flatMap { i => if(i == 9) Stream.raiseError(new RuntimeException("I don't like 9s")) else Stream.emit(i) }
  .attempt
x1.compile.toVector.unsafeRunSync().foreach(println)

其中第一个是可取的,因为 flatMapemit 将导致 size-1 block ,通常效率较低。如果您想在出现第一个错误时停止处理,请将 .rethrow 添加到流的末尾。

关于scala - FS2 Stream 异常处理不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48749960/

相关文章:

templates - 如何循环遍历 scala 模板中的 flash 元素?

java - 使用 SPARK 从 zip 到 seq

functional-programming - Racket 遍历列表并获取索引

f# - 有人有 --standalone 选项可以在 F# CTP 中工作吗?

scala - 从 Java 属性获取 Scala 映射

scala - 使用嵌套字段更新数据框 - Spark

java - 如何在不使用异常的情况下从方法返回失败详细信息并可选择包含一个值?

java - Spring 3.0 MVC 异常处理程序

python - 如果不立即重新引发异常回溯,则隐藏

javascript - 返回检索成员变量的函数的函数