scala - Akka Streams - 如何检查流是否成功运行?

标签 scala akka future akka-stream

我有一个 Source[ByteString, Any] 类型的源 source 和一个 Sink[ByteString] 类型的接收器 sink , M],其中 M 可以是 Future[IOResult] 或任何内容。当我运行以下命令时:

source.runWith(sink)

我得到M作为结果。我没有得到任何其他有用的信息表明流已成功。如果保证它是 Future[IOResult] 那就太好了,但是接收器来自通用类型类,并且您无法在编译时确定类型。

但是,类型类的所有实例都应报告操作是否成功。

或者我应该将具体化类型包装成自定义类型,例如Result[M],以便我可以轻松操作?这可能需要一个新的类型类,其中 M 的实例必须是其成员(例如,您应该指定 Future[IOResult] 如何转换为 Result[Future[IOResult]]。相同对于所有可能的 M 实例)。

最佳答案

您可以在源上使用终止观察程序,和/或在流上使用 onComplete

source.watchTermination() { (_, done) =>
  done.onComplete {
    case Success(_) => logger.info("source completed successfully")
    case Failure(e) => logger.error(s"source completed with failure : $e")
  }
}
.runWith(sink)
.onComplete{
  case Success(_) => logger.info(s"stream completed successfully")
  case Failure(e) => logger.error(s"stream completed with failure: $e")
}

参见https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/watchTermination.html

关于scala - Akka Streams - 如何检查流是否成功运行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56679407/

相关文章:

gwt - Scala 可以用于编写 GWT 应用程序吗?

scala - 计算一行的秩

scala - 如何在查找中隐藏 Akka 远程 Actor ?

scala - Akka 流重试重复结果

architecture - 用于 AKKA Actor 框架?

json - 如何在scala中解析HTTP请求返回的Json数据

scala - 从数据框中选择时重命名列名

Scala Future.find

c++ - 将来尝试引用已删除的函数

multithreading - 为什么这不编译? (RValue 作为线程 CTOR 参数)