java - Apache 弗林克 : How to count the total number of events in a DataStream

标签 java apache-flink flink-streaming flink-cep

我有两个原始流,我正在加入这些流,然后我想计算已加入的事件总数和未加入的事件总数。我通过使用 joinedEventDataStream 上的 map 来执行此操作,如下所示

joinedEventDataStream.map(new RichMapFunction<JoinedEvent, Object>() {

            @Override
            public Object map(JoinedEvent joinedEvent) throws Exception {

                number_of_joined_events += 1;

                return null;
            }
        });

问题 1:这是计算流中事件数量的适当方法吗?

问题#2:我注意到了一种有线行为,有些人可能不相信。问题是,当我在 IntelliJ IDE 中运行 Flink 程序时,它会显示 number_of_joined_events 的正确值,但在我将此程序作为 jar 提交时显示 0 。因此,当我将程序作为 jar 文件运行时,我得到的是 number_of_joined_events 的初始值,而不是实际计数。为什么只有在提交 jar 文件时才会出现这种情况,而在 IDE 中却不会?

最佳答案

你的方法不起作用。您在通过 JAR 文件执行程序时注意到的行为是预期的。

我不知道number_of_joined_events是如何定义的,但我假设它是程序中的静态变量。当您在 IDE 中运行该程序时,它在单个 JVM 中运行。因此,所有运算符都可以访问静态变量。当您向远程进程提交 JAR 文件时,该程序将在不同的 JVM(可能是多个 JVM)中执行,并且客户端进程中的静态变量永远不会更新。

您可以使用 Flink 的指标或将 1 相加的 ReduceFunction 来计算已处理记录的数量。

关于java - Apache 弗林克 : How to count the total number of events in a DataStream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47246721/

相关文章:

apache-flink - Flink Kafka 连接器 0.10.0 事件时间澄清和 ProcessFunction 澄清

java - Flink 流 - 笛卡尔积和流上的窗口

java - Eclipse 想要运行错误的类

java - 使用 BouncyCaSTLe API 进行 RSA 加密

java - 通过 URLClassloader 加载加密的 Jar 文件

java - Flink Tumble Window 触发时间

java - 我可以将 javafx/openjfx 与 OpenJDK 8 一起使用吗?

scala - 找不到记录器的附加程序(org.apache.kafka.clients.consumer.ConsumerConfig)

parallel-processing - Apache 弗林克 : How to execute in parallel but keep order of messages?

apache-kafka - Flink - 如何使用 withTimestampAssigner 从事件负载中获取时间(不使用 Kafka 时间戳)