apache-flink - 如何统计给定时间窗口内Apache Flink处理的记录数

标签 apache-flink flink-streaming

在flink中定义一个时间窗口后如下:

val lines = socket.timeWindowAll(Time.seconds(5))

如何计算 5 秒的特定窗口中的记录数?

最佳答案

执行计数聚合的最有效方法是 ReduceFunction。但是,reduce 有输入和输出类型必须相同的限制。因此,在应用窗口之前,您必须将输入转换为 Int:

val socket: DataStream[(String)] = ???

val cnts: DataStream[Int] = socket
  .map(_ => 1)                    // convert to 1
  .timeWindowAll(Time.seconds(5)) // group into 5 second windows
  .reduce( (x, y) => x + y)       // sum 1s to count

关于apache-flink - 如何统计给定时间窗口内Apache Flink处理的记录数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45606999/

相关文章:

java - 类型删除和 Flink : what causes run time error?

apache-flink - Apache Flink 动态设置JVM_OPT env.java.opts

apache-flink - 在 Scala Shell 中实例化 StreamExecutionEnvironment

apache-flink - Flink 算子间共享状态

java - Flink DataStream - 如何从输入元素启动源?

apache-flink - Flink Kafka 连接器 0.10.0 事件时间澄清和 ProcessFunction 澄清

java - Apache 弗林克 : Standalone Cluster tries to connect with username "flink"

apache-flink - flink 如何处理早期事件?忽略或创建单独的窗口?

java - Flink使用gradle代替maven的优缺点

java - 弗林克 : Jar file execution on Flink cluster