Flink 中的新时态表看起来棒极了,但我还无法让它们发挥作用。由于我找不到任何有效的示例,我想知道是否有其他人可以让它工作并可以指出我做错了什么。
这里有一些背景:
查询:
SELECT s.id FROM sitemembership AS m, LATERAL TABLE (site(m.ts)) AS s WHERE m.siteId = s.id
设置:
// { "streamName": "sitemembership", "key": "siteId" }
Table table = tableEnv.fromDataStream(stream, String.join(",", rowTypeInfo.getFieldNames()) + ",ts.rowtime");
table.printSchema();
tableEnv.registerTable(streamName, table);
// { "streamName": "site", "key": "id" }
Table table = tableEnv.fromDataStream(stream, String.join(",", rowTypeInfo.getFieldNames()) + ",ts.rowtime");
TemporalTableFunction temporalTable = table.createTemporalTableFunction("ts", key);
tableEnv.registerFunction(streamName, temporalTable);
我没有收到任何行,也没有错误。我尝试通过更改注册为临时表的表来翻转查询,但没有成功。我还查看了“ts”列并获取了日期,这让我相信我应该至少获得几行。
感谢任何帮助。
附注我正在对来自 kafka 的历史数据运行此操作,该历史数据在“id”上分区,这也是行键
最佳答案
您可以以测试的形式找到完全有效的代码“示例”here (这两个测试的内容(处理时间和事件时间)或多或少在文档 here 和 here 或 here 中重复)。您可以从这些示例开始,然后逐步将它们转换为您的确切用例/场景。首先从预定义的数据集开始,然后再切换到从 Kafka 读取数据可能会有所帮助。
关于您的问题,从您的代码片段中不清楚出了什么问题,一些潜在的问题:
- 水印未分配/不增加(
assignTimestampsAndWatermarks()
在链接的testEventTimeInnerJoin()
中调用)。 Temporal Join 运算符仅在水印上发出数据。 - 您尝试连接的两个表之间的行时间不同步。如果
site
没有足够旧的行可以与sitemembership
连接记录,结果将为空。例如,如果来自site
的所有记录有年份2019
的时间字段,而sitemembership
仅具有2018
的记录.
关于apache-flink - 如何使用Flink时态表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54441594/