在我们的流应用程序中,使用 Flink 1.55 及其表 API,我需要检测和处理延迟元素。我无法找到 DataStream API .sideOutputLateData(...) 功能的替代方案
我尝试在 Flink 文档中搜索 https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/tableApi.html谷歌了很多,没有发现什么有用的
示例:
table
.window(Tumble over windowLengthInMinutes.minutes on 'timeStamp as 'timeWindow)
.groupBy(..fieds list)
.select(..fields)
提供的代码按预期工作。问题是,迟到的元素(由窗口大小和允许的迟到时间定义)将被丢弃。 有没有办法通过 Table API 本地处理这些后期元素?
最佳答案
从 Flink 1.8.0 开始,Table API 目前似乎不直接支持此功能。解决此问题的一种方法是将表转换为 DataStream[Row]
并在其上设置侧面输出:
val outputTag = OutputTag[String]("side-output")
val flink = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(flink)
// Make sure the source emits data to the selected side output
tableEnv.registerTableSource(...)
val table = tableEnv.sqlQuery("QUERY")
// Can also be toAppendStream, depending on the underlying table output
val dataStream = tableEnv.toRetractStream(table)
val sideOutputStream = dataStream.getSideOutput(outputTag)
关于scala - 如何处理 Flink 的 Table API 窗口中的延迟元素?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56667154/