我正在使用Flink v.1.4.0
。
我正在使用 DataSet
API,我想尝试的事情之一与 Apache Spark
中使用广播变量的方式非常相似。
实际上,我想在 DataSet
上应用映射函数,遍历 DataSet
中的每个元素并在 HashMap< 中搜索它
;如果搜索元素存在于 map 中,则检索相应的值。
HashMap
非常大,我不知道(因为我什至还没有构建我的解决方案)它是否需要Serialized
才能传输和使用所有 worker 同时进行。
一般来说,我想到的解决方案如下所示:
Map<String, T> hashMap = new ... ;
DataSet<Point> points = env.readCsv(...);
points
.map(point -> hashMap.getOrDefault(point.getId, 0))
...
但我不知道这是否有效或者是否有效。经过一番搜索后,我发现了一个更好的例子 here根据这一点,我们可以使用 Flink
中的 Broadcast
变量来广播 List
,如下所示:
DataSet<Point> points = env.readCsv(...);
DataSet<Centroid> centroids = ... ; // some computation
points.map(new RichMapFunction<Point, Integer>() {
private List<Centroid> centroids;
@Override
public void open(Configuration parameters) {
this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
}
@Override
public Integer map(Point p) {
return selectCentroid(centroids, p);
}
}).withBroadcastSet("centroids", centroids);
但是,.getBroadcastVariable()
似乎仅适用于List
。
- 有人可以通过
HashMap
提供替代解决方案吗? - 该解决方案如何运作?
- 解决这个问题最有效的方法是什么?
- 可以使用 Flink 托管状态来执行类似于使用广播变量的操作吗?如何?
- 最后,我可以尝试在管道中使用多个广播变量进行多个
映射
吗?
最佳答案
hashMap
的值在哪里?来自?另外两种可能的解决方案:
- 重新初始化/重新创建/重新生成
hashMap
在 open 中单独的过滤/映射运算符的每个实例中方法。每条记录的效率可能更高,但会重复初始化逻辑。 - 创建两个
DataSet
,一个代表hashMap
值,第二个为points
和 join那两个DataSet
使用desired join strategy 。打个比方,你想要做的事情可以通过 SQL 查询SELECT * FROM points p, hashMap h WHERE h.key = p.id
来表达。 。
关于java - 在 Flink 中广播 HashMap,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49039896/