学习 Akka 流。我有一个记录流,每个时间单位很多,已经按时间排序(来自 Slick),我想通过检测时间步长何时更改将它们分批处理到时间组中进行处理。
示例
case class Record(time: Int, payload: String)
如果传入流是
Record(1, "a")
Record(1, "k")
Record(1, "k")
Record(1, "a")
Record(2, "r")
Record(2, "o")
Record(2, "c")
Record(2, "k")
Record(2, "s")
Record(3, "!")
...
我想把它变成
Batch(1, Seq("a","k","k","a"))
Batch(2, Seq("r","o","c","k","s"))
Batch(3, Seq("!"))
...
到目前为止,我只发现按固定数量的记录进行分组,或者分成许多子流,但从我的角度来看,我不需要多个子流。
更新:我找到了batch
,但它看起来更关心背压,而不仅仅是一直批处理。
最佳答案
statefulMapConcat
是 Akka Streams 库中的多功能工具。
val records =
Source(List(
Record(1, "a"),
Record(1, "k"),
Record(1, "k"),
Record(1, "a"),
Record(2, "r"),
Record(2, "o"),
Record(2, "c"),
Record(2, "k"),
Record(2, "s"),
Record(3, "!")
))
.concat(Source.single(Record(0, "notused"))) // needed to print the last element
records
.statefulMapConcat { () =>
var currentTime = 0
var payloads: Seq[String] = Nil
record =>
if (record.time == currentTime) {
payloads = payloads :+ record.payload
Nil
} else {
val previousState = (currentTime, payloads)
currentTime = record.time
payloads = Seq(record.payload)
List(previousState)
}
}
.runForeach(println)
运行上面的命令打印如下:
(0,List())
(1,List(a, k, k, a))
(2,List(r, o, c, k, s))
(3,List(!))
您可以调整示例以打印 Batch
对象。
关于scala - Akka 流批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58471183/