scala - 使用 akka-stream 过滤异步

标签 scala akka akka-stream

是否有等效于 mapAsync()方法,但对于 filter ?

下面是一个使用伪代码的例子:

val filter: T => Future[Boolean] = /.../

source.filter(filter).runWith(/.../)
       ^^^^^^

谢谢

最佳答案

我认为没有Flow的直接方法或 Source具有您正在寻找的功能,但可用方法的组合将为您提供所需的功能:

def asyncFilter[T](filter: T => Future[Boolean], parallelism : Int = 1)
                  (implicit ec : ExecutionContext) : Flow[T, T, _] =
  Flow[T].mapAsync(parallelism)(t => filter(t).map(_ -> t))
         .filter(_._1)
         .map(_._2)

关于scala - 使用 akka-stream 过滤异步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50951089/

相关文章:

Scala 函数文字和未绑定(bind)占位符参数

ruby-on-rails - 高性能 REST API - 哪种语言/堆栈?

multithreading - 使用 akka 进行日志记录。应用程序设计

scala - 如何突然停止 akka 流 Runnable Graph?

scala - 如何在 akka-streams map 与 mapAsync 中使用异步驱动程序

java - 如何在 Sql 中转义分号

java - java返回类型可以像Scala那样写 this.type

scala - Akka中询问和告诉之间的区别?

scala - Akka流源-Cassandra结果集

scala - 如何发布或订阅物化的 Akka Stream 流图?