apache-flink - Flink流: Get top n of elements for each timewindow

标签 apache-flink flink-streaming

我有数据流的元组( float ,字符串),我想对其进行排序并为每个时间窗口(固定)选择三个最大值。数据流的窗口化是根据处理时间并按自然顺序排序。

使用Flink 1.0.1,以下是我的尝试

          val topTasks = new mutable.PriorityQueue[(Float, String)](Ordering.Tuple2.reverse) //Ex:(5250, "mytask")
          //Get stream and other operations ...
          val sortMetricStream = metricStream
                       .map { metrics =>
                         topTasks.enqueue(metrics._1, metrics._2)
                       }
                       .timeWindowAll(Time.seconds(10))
                       .reduce({ (topTasks.dequeue()._2, topTasks.dequeue()._2, topTasks.dequeue()._2)
                       })

    val sortMetricStream = metricStream
                       .timeWindowAll(Time.seconds(10))
                       .partitionByRange(0)
                       .sortPartition(0, Order.DESCENDING)

在任一 sortMetricStream 中都没有给我预期的任务名称。

对此的任何帮助将不胜感激。

最佳答案

使用apply(...)而不是reduce(...) (参见https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#datastream-transformations)

通过使用 WindowFunction#apply() 的迭代器您可以在内部缓冲窗口的所有记录(例如在列表中),然后排序(列表),最后发出结果。您可以调用Collector#collect()零次、一次或多次。

关于apache-flink - Flink流: Get top n of elements for each timewindow,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39263660/

相关文章:

scala - 无法将()自定义函数应用于 Flink 上的 WindowedStream

hadoop - 如何获取FlinkSQL中Row的长度?

java - 如何更改独立 FLINK 集群中单个应用程序的日志记录级别

apache-flink - Flink 用于无状态处理

apache-flink - Apache 弗林克 : Window Functions and the beginning of time

apache-flink - Apache Flink : What's the difference between side outputs and split() in the DataStream API?

apache-flink - 在不重启作业的情况下在 Apache Flink 中动态添加模式

java - Flink 1.9.1 无法再连接到 Azure Event Hub

database - 通过 Flink 实时更新 Dgraph

sql - Apache Calcite 是否提供添加自定义子句或语句的方法?