我有数据流的元组( float ,字符串),我想对其进行排序并为每个时间窗口(固定)选择三个最大值。数据流的窗口化是根据处理时间并按自然顺序排序。
使用Flink 1.0.1,以下是我的尝试
val topTasks = new mutable.PriorityQueue[(Float, String)](Ordering.Tuple2.reverse) //Ex:(5250, "mytask")
//Get stream and other operations ...
val sortMetricStream = metricStream
.map { metrics =>
topTasks.enqueue(metrics._1, metrics._2)
}
.timeWindowAll(Time.seconds(10))
.reduce({ (topTasks.dequeue()._2, topTasks.dequeue()._2, topTasks.dequeue()._2)
})
和
val sortMetricStream = metricStream
.timeWindowAll(Time.seconds(10))
.partitionByRange(0)
.sortPartition(0, Order.DESCENDING)
在任一 sortMetricStream 中都没有给我预期的任务名称。
对此的任何帮助将不胜感激。
最佳答案
使用apply(...)
而不是reduce(...)
(参见https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#datastream-transformations)
通过使用 WindowFunction#apply()
的迭代器您可以在内部缓冲窗口的所有记录(例如在列表中),然后排序(列表),最后发出结果。您可以调用Collector#collect()
零次、一次或多次。
关于apache-flink - Flink流: Get top n of elements for each timewindow,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39263660/