java - 在 flatMapToPair 中访问 HashMap

标签 java apache-spark hadoop

编辑:已经使用RDD.collectAsMap()解决了

我正在尝试复制 http://on-demand.gputechconf.com/gtc/2016/presentation/S6424-michela-taufer-apache-spark.pdf 第 28-30 页中的问题解决方案

我有一个在映射函数之外实例化的 HashMap。 HashMap包含以下数据:

{1:2, 2:3, 3:2, 4:2, 5:3}

先前定义的 RDD previousRDD 的类型为:

JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>>

有数据:

1: [(1,2), (1,5)]
2: [(2,1), (2,3), (2,5)]
3: [(3,2), (3,4)]
4: [(4,3), (4,5)]
5: [(5,1), (5,2), (5,4)]

我尝试使用 flatMapToPair 创建一个新的 RDD:

JavaPairRDD<Integer, Integer> newRDD = previousRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>>, Integer, Integer>() {
    @Override
    public Iterator<Tuple2<Integer, Integer>> call(Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>> integerIterableTuple2) throws Exception {
        Integer count;
        ArrayList<Tuple2<Integer, Integer>> list = new ArrayList<>();
        count = hashMap.get(integerIterableTuple2._1);
        for (Tuple2<Integer, Integer> t : integerIterableTuple2._2) {
            Integer tcount = hashMap.get(t._2);
            if (count < tcount || (count.equals(tcount) && integerIterableTuple2._1 < t._2)) {
                list.add(t);
            }
        }
        return list.iterator();
    }
});

但在此情况下,for 循环内的 hashMap.get(t._2) 大部分时间都会获取 NULL。我已经检查了 HashMap 中的值是否正确。

有没有办法在 Spark 函数中正确获取 HashMap 的值?

最佳答案

它应该可以工作。 Spark 应该捕获您的变量,将其序列化并发送给每个任务的每个工作人员。您可以尝试广播此 map

sc.broadcast(hashMap)

并使用结果而不是hashMap。它在内存方面也更高效(每个执行器共享存储)。

关于java - 在 flatMapToPair 中访问 HashMap,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61133901/

相关文章:

java - Hadoop mapreduce空输入格式

java - 使用命令行在 Windows 中的 hadoop 上运行 wordcount.jar

java - Hadoop 查询、日期、循环、BASH 或 Java

java - 如何使用Firestore仅存储当前登录用户的数据

java - 从数据库中删除行

python - 在 Pyspark 中评估分类器时,“SparkSession”对象没有属性 'serializer'

apache-spark - 使用 spark.sql.autoBroadcastJoinThreshold 时,Spark Driver 不释放内存

java - 如何使用 ByteBuddy 向现有实例添加字段?

java - Spring Security 不允许登录页面

java - 尝试通过 Java SDK 将记录从 Spark DataFrame 写入 Dynamodb 时任务不可序列化