我有一个类型为 DataStream[(String, somecaseclass)]
的 Flink DataStream。我想对 Tuple
的第一个字段进行分组,即 String
并创建一个 ListBuffer[somecaseclass]
。以下是我尝试过的:
val emptylistbuffer = new ListBuffer[somecaseclass]
inputstream
.keyBy(0)
.fold(emptylistbuffer){case(outputbuffer,b) => {outputbuffer+=b._2}}
但是这为我提供了每一行的输出,这意味着如果有 10 个输入行,则第一个输出行仅连接到第一行,第十行为我提供了十行的连接输出。但是,我只想要第十行。我检查了 Flink DataStream
上的几乎所有转换,但没有适合用例的。
输入:
(filename1.dat,somecaseclass("abc","1",2))
(filename1.dat,somecaseclass("dse","2",3))
(filename1.dat,somecaseclass("daa","1",4))
预期输出:
(filename.dat,ListBuffer(somecaseclass("abc","1",2),somecaseclass("dse","2",3),somecaseclass("daa","1",4)))
最佳答案
DataStream API 认为 DataStream
是无界的。这意味着 DataStream
可能提供无限数量的记录。因此,不可能在收到所有记录后“仅”发出聚合结果(在您的情况下是完整的 ListBuffer
),因为可能有更多记录需要聚合(添加到ListBuffer
)。原则上,DataStream
上的聚合永远不会产生最终结果,因为可能会有更多记录。由于这不太实用,Flink 的 DataStream API 会为每个传入记录生成一个新结果。
在无界流上计算聚合的一种常见方法是窗口。 Windows 在流上定义了有边界的部分,可以在这些部分上计算聚合并发出最终结果。 Flink 提供了基于时间或记录计数的内置窗口。例如,您在 1 小时的滚动窗口上的记录收集功能将收集一小时内到达的所有记录。
请检查 different window types 的 Flink 文档以及如何使用它们。
关于apache-flink - 在 Flink Streaming 中按键分组并收集到一个 ListBuffer 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48009496/