apache-flink - 在 Flink Streaming 中按键分组并收集到一个 ListBuffer 中

标签 apache-flink flink-streaming

我有一个类型为 DataStream[(String, somecaseclass)] 的 Flink DataStream。我想对 Tuple 的第一个字段进行分组,即 String 并创建一个 ListBuffer[somecaseclass]。以下是我尝试过的:

val emptylistbuffer = new ListBuffer[somecaseclass]
inputstream
  .keyBy(0)
  .fold(emptylistbuffer){case(outputbuffer,b) => {outputbuffer+=b._2}}

但是这为我提供了每一行的输出,这意味着如果有 10 个输入行,则第一个输出行仅连接到第一行,第十行为我提供了十行的连接输出。但是,我只想要第十行。我检查了 Flink DataStream 上的几乎所有转换,但没有适合用例的。

输入:

(filename1.dat,somecaseclass("abc","1",2))
(filename1.dat,somecaseclass("dse","2",3))
(filename1.dat,somecaseclass("daa","1",4))

预期输出:

(filename.dat,ListBuffer(somecaseclass("abc","1",2),somecaseclass("dse","2",3),somecaseclass("daa","1",4)))

最佳答案

DataStream API 认为 DataStream 是无界的。这意味着 DataStream 可能提供无限数量的记录。因此,不可能在收到所有记录后“仅”发出聚合结果(在您的情况下是完整的 ListBuffer),因为可能有更多记录需要聚合(添加到ListBuffer)。原则上,DataStream 上的聚合永远不会产生最终结果,因为可能会有更多记录。由于这不太实用,Flink 的 DataStream API 会为每个传入记录生成一个新结果。

在无界流上计算聚合的一种常见方法是窗口。 Windows 在流上定义了有边界的部分,可以在这些部分上计算聚合并发出最终结果。 Flink 提供了基于时间或记录计数的内置窗口。例如,您在 1 小时的滚动窗口上的记录收集功能将收集一小时内到达的所有记录。

请检查 different window types 的 Flink 文档以及如何使用它们。

关于apache-flink - 在 Flink Streaming 中按键分组并收集到一个 ListBuffer 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48009496/

相关文章:

hadoop-yarn - 从 YARN 上的另一个程序启动的 Flink 作业失败,错误为 "JobClientActor seems to have died"

scala - Flink : No operators defined in streaming topology.无法执行

azure - 当我运行 flink 作业将数据存储到 Azure Data Lake 时,出现以下异常。有人可以指导我吗?

apache-flink - Flink 中是否弃用了 JSONDeserializationSchema()?

monitoring - 我如何覆盖 Apache Flink 中的配置值?

hadoop - Flink - AWS EMR 中的 AWSS3IOException 由带有 S3A 的 BucketingSink 引起

java - 使用 Flink 时,字数统计数字总是在变化

java - 在另一个数据流中创建新的数据流

apache-flink - 命名要在 Flink 执行计划 UI 中显示的运算符、源、接收器和模式