是否有等效于 mapAsync()
方法,但对于 filter
?
下面是一个使用伪代码的例子:
val filter: T => Future[Boolean] = /.../
source.filter(filter).runWith(/.../)
^^^^^^
谢谢
最佳答案
我认为没有Flow
的直接方法或 Source
具有您正在寻找的功能,但可用方法的组合将为您提供所需的功能:
def asyncFilter[T](filter: T => Future[Boolean], parallelism : Int = 1)
(implicit ec : ExecutionContext) : Flow[T, T, _] =
Flow[T].mapAsync(parallelism)(t => filter(t).map(_ -> t))
.filter(_._1)
.map(_._2)
关于scala - 使用 akka-stream 过滤异步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50951089/