java - 为两个输入流分配时间戳和水印,稍后使用 'EventTime' 连接以进行动态警报

标签 java scala apache-flink

我有一个 Flink 程序,它接受两个流,即数据/传感器读数流和警报规则流。我正在广播规则流并将其连接到数据流以生成动态警报。 ProcessingTime 一切正常,但 EventTime 无效。 我已经为我的数据流分配了时间戳和水印,并按原样传递规则流(因为规则流只有在看到新的附加规则/更新时才会有记录)。但不会生成任何警报。

  1. 当连接两个流(一个带有时间戳和水印,一个仅包含规则(广播))并根据规则进行动态处理时,如何使用“EventTime”生成警报。

  2. 我是否也需要为我的规则流分配时间戳和水印?

  3. 因为我的规则流只有在有任何添加/修改时才会有记录。是否有任何解决方法或黑客可以避免/克服这种情况?

如有任何帮助/建议,我们将不胜感激。

-- 我试过的! 我尝试只使用一个流,即数据流,通过使用硬编码窗口规则生成警报。它工作正常。但是当我将它与规则流连接时,它无法生成任何警报/输出。

“ProcessingTime”一切正常,但“EventTime”不正常。

--我所期望的! 当我将连续数据流与非连续规则流连接时,我希望我的程序能够使用“EventTime”生成动态警报。

最佳答案

Flink 培训中的这个练习恰好涵盖了这种情况:https://training.ververica.com/exercises/taxiQuery.html .有关详细信息,请参阅提示和解决方案,但那里采用的方法是在流上使用此时间戳提取器/水印生成器并遵循以下规则:

// Once the two streams are connected, the Watermark of the KeyedBroadcastProcessFunction operator
// will be the minimum of the Watermarks of the two connected streams. Our query stream has a default
// Watermark at Long.MIN_VALUE, and this will hold back the event time clock of the
// KeyedBroadcastProcessFunction, unless we do something about it.
public static class QueryStreamAssigner implements AssignerWithPeriodicWatermarks<String> {
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return Watermark.MAX_WATERMARK;
    }

    @Override
    public long extractTimestamp(String element, long previousElementTimestamp) {
        return 0;
    }
}

这具有让另一个流完全负责水印的效果,这正是本例中所需要的。

关于java - 为两个输入流分配时间戳和水印,稍后使用 'EventTime' 连接以进行动态警报,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57585528/

相关文章:

java - SpringApplicationConfiguration 无法解析为类型

java - 从 NDK 读取 Settings.Secure -decodeIndirectRef 中的间接引用无效

java - OSGI:重新启动 bundle 后,激活和绑定(bind)方法会更改启动顺序

apache-kafka - 如何在 Kafka 0.10.1.0 中使用 Flink?

java - 如何让回文程序不识别空格并以句点结尾?

scala - Play Framework的ScalaRouting中的依赖注入(inject)路由器和静态路由器有什么区别?

scala - 由 spark 编写的 Parquet 文件中的 Athena/Hive 时间戳

java - Neo4j - 使用 Scala/Java API 检索时限制节点数量

java - Flink - 多源集成测试

java - 弗林克 : Declaring dynamic tuple size & type