我有一个 Source[ByteString, Any]
类型的源 source
和一个 Sink[ByteString] 类型的接收器
,其中 sink
, M]M
可以是 Future[IOResult]
或任何内容。当我运行以下命令时:
source.runWith(sink)
我得到M
作为结果。我没有得到任何其他有用的信息表明流已成功。如果保证它是 Future[IOResult] 那就太好了,但是接收器来自通用类型类,并且您无法在编译时确定类型。
但是,类型类的所有实例都应报告操作是否成功。
或者我应该将具体化类型包装成自定义类型,例如Result[M]
,以便我可以轻松操作?这可能需要一个新的类型类,其中 M 的实例必须是其成员(例如,您应该指定 Future[IOResult]
如何转换为 Result[Future[IOResult]]
。相同对于所有可能的 M
实例)。
最佳答案
您可以在源上使用终止观察程序,和/或在流上使用 onComplete
。
source.watchTermination() { (_, done) =>
done.onComplete {
case Success(_) => logger.info("source completed successfully")
case Failure(e) => logger.error(s"source completed with failure : $e")
}
}
.runWith(sink)
.onComplete{
case Success(_) => logger.info(s"stream completed successfully")
case Failure(e) => logger.error(s"stream completed with failure: $e")
}
参见https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/watchTermination.html
关于scala - Akka Streams - 如何检查流是否成功运行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56679407/