编辑:已经使用RDD.collectAsMap()
解决了
我正在尝试复制 http://on-demand.gputechconf.com/gtc/2016/presentation/S6424-michela-taufer-apache-spark.pdf 第 28-30 页中的问题解决方案
我有一个在映射函数之外实例化的 HashMap。 HashMap包含以下数据:
{1:2, 2:3, 3:2, 4:2, 5:3}
先前定义的 RDD previousRDD 的类型为:
JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>>
有数据:
1: [(1,2), (1,5)]
2: [(2,1), (2,3), (2,5)]
3: [(3,2), (3,4)]
4: [(4,3), (4,5)]
5: [(5,1), (5,2), (5,4)]
我尝试使用 flatMapToPair 创建一个新的 RDD:
JavaPairRDD<Integer, Integer> newRDD = previousRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>>, Integer, Integer>() {
@Override
public Iterator<Tuple2<Integer, Integer>> call(Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>> integerIterableTuple2) throws Exception {
Integer count;
ArrayList<Tuple2<Integer, Integer>> list = new ArrayList<>();
count = hashMap.get(integerIterableTuple2._1);
for (Tuple2<Integer, Integer> t : integerIterableTuple2._2) {
Integer tcount = hashMap.get(t._2);
if (count < tcount || (count.equals(tcount) && integerIterableTuple2._1 < t._2)) {
list.add(t);
}
}
return list.iterator();
}
});
但在此情况下,for 循环内的 hashMap.get(t._2)
大部分时间都会获取 NULL。我已经检查了 HashMap 中的值是否正确。
有没有办法在 Spark 函数中正确获取 HashMap 的值?
最佳答案
它应该可以工作。 Spark 应该捕获您的变量,将其序列化并发送给每个任务的每个工作人员。您可以尝试广播此 map
sc.broadcast(hashMap)
并使用结果而不是hashMap
。它在内存方面也更高效(每个执行器共享存储)。
关于java - 在 flatMapToPair 中访问 HashMap,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61133901/