apache-flink - 如何使用Flink时态表?

标签 apache-flink

Flink 中的新时态表看起来棒极了,但我还无法让它们发挥作用。由于我找不到任何有效的示例,我想知道是否有其他人可以让它工作并可以指出我做错了什么。

这里有一些背景:

查询:

SELECT s.id FROM sitemembership AS m, LATERAL TABLE (site(m.ts)) AS s WHERE m.siteId = s.id

设置:

// { "streamName": "sitemembership", "key": "siteId" }
Table table = tableEnv.fromDataStream(stream, String.join(",", rowTypeInfo.getFieldNames()) + ",ts.rowtime");
table.printSchema();
tableEnv.registerTable(streamName, table);

// { "streamName": "site", "key": "id" }
Table table = tableEnv.fromDataStream(stream, String.join(",", rowTypeInfo.getFieldNames()) + ",ts.rowtime");
TemporalTableFunction temporalTable = table.createTemporalTableFunction("ts", key);
tableEnv.registerFunction(streamName, temporalTable);

我没有收到任何行,也没有错误。我尝试通过更改注册为临时表的表来翻转查询,但没有成功。我还查看了“ts”列并获取了日期,这让我相信我应该至少获得几行。

感谢任何帮助。

附注我正在对来自 kafka 的历史数据运行此操作,该历史数据在“id”上分区,这也是行键

最佳答案

您可以以测试的形式找到完全有效的代码“示例”here (这两个测试的内容(处理时间和事件时间)或多或少在文档 hereherehere 中重复)。您可以从这些示例开始,然后逐步将它们转换为您的确切用例/场景。首先从预定义的数据集开始,然后再切换到从 Kafka 读取数据可能会有所帮助。

关于您的问题,从您的代码片段中不清楚出了什么问题,一些潜在的问题:

  • 水印未分配/不增加( assignTimestampsAndWatermarks() 在链接的 testEventTimeInnerJoin() 中调用)。 Temporal Join 运算符仅在水印上发出数据。
  • 您尝试连接的两个表之间的行时间不同步。如果site没有足够旧的行可以与 sitemembership 连接记录,结果将为空。例如,如果来自 site 的所有记录有年份 2019 的时间字段,而sitemembership仅具有 2018 的记录.

关于apache-flink - 如何使用Flink时态表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54441594/

相关文章:

apache-flink - Flink流: Data stream that gets controlled by control stream

apache-flink - 弗林克 : cannot cancel a running job (streaming)

digital-ocean - 在DCOS中安装Flink出错

apache-flink - NOT followBy 的 Apache Flink CEP 模式操作

apache-flink - Flink 键控流中记录的排序

apache-flink - 在 Flink 中检查点时,计时器过多会花费太多时间

apache-flink - 失败消息 : Checkpoint expired before completing when using apache flink 1. 11

kubernetes - Flink Statefun HA Kubernetes集群

java - KafkaAvroDeserializer 因 Kyro 异常而失败

scala - Apache 弗林克 : Count window with timeout