scala - Akka 流批处理

标签 scala akka akka-stream

学习 Akka 流。我有一个记录流,每个时间单位很多,已经按时间排序(来自 Slick),我想通过检测时间步长何时更改将它们分批处理到时间组中进行处理。

示例

case class Record(time: Int, payload: String)

如果传入流是

Record(1, "a")
Record(1, "k")
Record(1, "k")
Record(1, "a")
Record(2, "r")
Record(2, "o")
Record(2, "c")
Record(2, "k")
Record(2, "s")
Record(3, "!")
...

我想把它变成

Batch(1, Seq("a","k","k","a"))
Batch(2, Seq("r","o","c","k","s"))
Batch(3, Seq("!"))
...

到目前为止,我只发现按固定数量的记录进行分组,或者分成许多子流,但从我的角度来看,我不需要多个子流。

更新:我找到了batch ,但它看起来更关心背压,而不仅仅是一直批处理。

最佳答案

statefulMapConcat是 Akka Streams 库中的多功能工具。

val records =
  Source(List(
    Record(1, "a"),
    Record(1, "k"),
    Record(1, "k"),
    Record(1, "a"),
    Record(2, "r"),
    Record(2, "o"),
    Record(2, "c"),
    Record(2, "k"),
    Record(2, "s"),
    Record(3, "!")
  ))
  .concat(Source.single(Record(0, "notused"))) // needed to print the last element

records
  .statefulMapConcat { () =>
    var currentTime = 0
    var payloads: Seq[String] = Nil

    record =>
      if (record.time == currentTime) {
        payloads = payloads :+ record.payload
        Nil
      } else {
        val previousState = (currentTime, payloads)
        currentTime = record.time
        payloads = Seq(record.payload)
        List(previousState)
      }
  }
  .runForeach(println)

运行上面的命令打印如下:

(0,List())
(1,List(a, k, k, a))
(2,List(r, o, c, k, s))
(3,List(!))

您可以调整示例以打印 Batch 对象。

关于scala - Akka 流批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58471183/

相关文章:

scala 更好的映射 getOrElse 语法

scala - akka Stream 将 akka-htpp Web 请求调用集成到流中

akka - 在 spray-json 中解码嵌套的 json

scala - 优雅地停止 Akka Stream

java - 已建立的连接上的 TLS

scala - 有没有办法使用 Akka-Stream 获得可预测的 Actor 命名?

class - 即使是 Scala 中简单的序列化示例也不起作用。为什么?

function - 对常量值使用def vs. val有什么含义

scala - NoClassDefFoundError:使用Spark读取s3数据时出现org/apache/hadoop/fs/StreamCapabilities

akka - Actor 的行为应该在什么时候 split