scala - 从 Akka 流中的 Future 创建背压

标签 scala akka akka-stream

我是 Akka 流和一般流的新手,所以我可能在概念层面完全误解了某些东西,但是有什么方法可以创建背压直到 future 解决?基本上我想做的是这样的:

object Parser {
    def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ???
}

val futures = FileIO.fromPath(path)
  .map(st => Parser.parseBytesToSeq(st.toByteBuffer))
  .batch(1000, x => x)(_ ++ _)
  .map(values => doAsyncOp(values))
  .runWith(Sink.seq)

def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ???

从文件中读取字节并流式传输到解析器,解析器发出 Seq ExampleObject的s s,然后将它们流式传输到返回 Future 的异步操作.我想让它直到Future解析,流的其余部分被背压,然后在 Future 解析后恢复,传递另一个 Seq[ExampleObject]doAsyncOp ,恢复背压等。

现在我有这个工作:
Await.result(doAsyncOp(values), 10 seconds)

但我的理解是,这会锁定整个线程并且很糟糕。有没有更好的办法呢?

如果有帮助,大图是我正在尝试使用 Jawn 逐块解析一个非常大的 JSON 文件(太大而无法放入内存),然后将对象传递给 ElasticSearch 以便在解析时对其进行索引 - ElasticSearch 有一个包含 50 个待处理操作的队列,如果溢出,它将开始拒绝新对象。

最佳答案

这很容易。您需要使用 mapAync :)

val futures = FileIO.fromPath(path)
  .map(st => Parser.parseBytesToSeq(st.toByteBuffer))
  .batch(1000, x => x)(_ ++ _)
  .mapAsync(4)(values => doAsyncOp(values))
  .runWith(Sink.seq)

哪里4是并行度。

关于scala - 从 Akka 流中的 Future 创建背压,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39909303/

相关文章:

java - 将 Akka Iterable 转换为 java.lang.Iterable?

java - Akka : How to get actor causing exception on supervisor

scala - 如何在 Actor 中停止 Source.tick?

Java-Akka : Assemble message from multiple actors

swing - Scala Swing 组件大小调整

database - 用分支链接 DBIOAction,一个分支什么都不做

scala - 为什么 Scala 在与 @ 进行模式匹配时不推断类型参数

scala - List.filter 中的下划线

java - 传播 Akka 的 PoisonPill 消息

transactions - Akka 流和事务边界