我有一个像这样的RDD:
JavaPairRDD<PointFlag, Point> keyValuePair = ...
我想输出一个RDD,如下所示:
JavaRDD<Point> valuesRemainingAfterProcessing = processAndOutputSkylinePoints(keyValuePair)
处理将在单个节点中进行,因为处理的发生需要所有值。 (对它们和它们的标志进行比较)
我想做的是:
- 将所有内容映射到一个 ID:
JavaPairRDD<Integer, Tuple2<PointFlag, Point>> singleIdRDD = keyValuePair.mapToPair(fp -> new Tuple2(0, fp));
- 进行处理:
JavaRDD<Iterable<Point>> iterableGlobalSkylines = singleIdRDD.map(ifp -> calculateGlobalSkyline(ifp));
(calculateGlobalSkyline() 返回List<Point>
) - 转换为
JavaRDD<Point>
:JavaRDD<Point> globalSkylines = iterableGlobalSkylines.flatMap(p -> p);
这对我来说看起来像是一个肮脏的黑客,我想知道是否有更好的方法来做到这一点。
最佳答案
我发现的一个很好的解决方案(绝对不那么冗长)是使用 Spark API 中的 glom()
函数。此函数返回前一个 RDD 的所有元素的单个 List
或用官方术语来说:
返回通过将每个分区内的所有元素合并到列表中而创建的 RDD。
首先,您必须将 RDD 缩减为单个分区。解决办法如下:
JavaPairRDD<PointFlag, Point> keyValuePair = ...;
JavaPairRDD<PointFlag, Point> singlePartition = keyValuePair.coalesce(1);
JavaRDD<List<Tuple2<PointFlag, Point>>> groupedOnASingleList = keyValuePair.glom();
JavaRDD<Point> globalSkylinePoints = groupedOnASingleList.flatMap(singleList -> getGlobalSkylines(singleList));
如果有人有更好的答案,请随时发布。
关于java - 在单个节点中运行 Spark 计算,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27103977/