scala - 如何处理 Flink 的 Table API 窗口中的延迟元素?

标签 scala apache-flink

在我们的流应用程序中,使用 Flink 1.55 及其表 API,我需要检测和处理延迟元素。我无法找到 DataStream API .sideOutputLateData(...) 功能的替代方案

我尝试在 Flink 文档中搜索 https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/tableApi.html谷歌了很多,没有发现什么有用的

示例:

table
  .window(Tumble over windowLengthInMinutes.minutes on 'timeStamp as 'timeWindow)
  .groupBy(..fieds list)
  .select(..fields)

提供的代码按预期工作。问题是,迟到的元素(由窗口大小和允许的迟到时间定义)将被丢弃。 有没有办法通过 Table API 本地处理这些后期元素?

最佳答案

从 Flink 1.8.0 开始,Table API 目前似乎不直接支持此功能。解决此问题的一种方法是将表转换为 DataStream[Row] 并在其上设置侧面输出:

val outputTag = OutputTag[String]("side-output")

val flink = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(flink)

// Make sure the source emits data to the selected side output
tableEnv.registerTableSource(...)
val table = tableEnv.sqlQuery("QUERY")

// Can also be toAppendStream, depending on the underlying table output
val dataStream = tableEnv.toRetractStream(table)
val sideOutputStream = dataStream.getSideOutput(outputTag)

关于scala - 如何处理 Flink 的 Table API 窗口中的延迟元素?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56667154/

相关文章:

java - 弗林克 : Wrap executable non-flink jar to run it in a flink cluster

apache-flink - 有没有办法确定总作业并行度或运行 Flink 作业所需的插槽数量(在运行之前)

apache-flink - 在 Flink Streaming 中按键分组并收集到一个 ListBuffer 中

elasticsearch - 单个flink管道的多个Elasticsearch接收器

scala - 在测试中获取用于在 Akka 中创建 actor 的类实例

scala - Scala Play Controller 中的 Finagle 客户端

scala - 从Spark将地理数据插入Elasticsearch

apache-flink - Flink Streaming : From one window, 在另一个窗口中查找状态

scala - 在 Sbt 中包含 Spark 包

scala - Scala 中是否有任何方法可以将元组转换为列表?