我正在尝试根据给定的 PairRDD 创建新的 RDD。我有一个带有几个键的 PairRDD,但每个键都有很大(大约 100k)的值。我想以某种方式重新分区,使每个 Iterable<v>
到 RDD[v] 中,以便我可以进一步对这些值有效地应用 map、reduce、sortBy 等。我感觉 flatMapValues 是我的 friend ,但想与其他 Sparken 核实。这是用于实时 Spark 应用程序。我已经尝试过collect()并计算应用程序服务器内存中的所有测量值,但试图对其进行改进。
这就是我尝试的(伪)
class ComputeMetrices{
transient JavaSparkContext sparkContext;
/**
* This method compute 3 measures: 2 percentiles of different values and 1 histogram
* @param javaPairRdd
* @return
*/
public Map<String, MetricsSummary> computeMetrices(JavaPairRDD<String, InputData> javaPairRdd) {
JavaPairRDD<String, MetricsSummary> rdd = javaPairRdd.groupByKey(10).mapValues(itr => {
MetricsSummary ms = new MetricsSummary();
List<Double> list1
List<Double> list2
itr.foreach{ list1.add(itr._2.height); list2.add(itr._2.weight)}
//Here I want to convert above lists into RDD
JavaRDD<V> javaRdd1 = sparContext.parallelize(list1) //null pointer ; probably at sparkContext
JavaRDD<V> javaRdd2 = sparContext.parallelize(list2)
JavaPairRDD1 javaPairRdd1 = javaRdd1.sortBy.zipWithIndex()
JavaPairRDD2 javaPairRdd2 = javaRdd2.sortBy.zipWithIndex()
//Above two PairRDD will be used further to find Percentile values for range of (0..100)
//Not writing percentile algo for sake of brevity
double[] percentile1 = //computed from javaPairRdd1
double[] percentile2 = //computed from javaPairRdd2
ms.percentile1(percentile1)
ms.percentile2(percentile2)
//compute histogram
JavaDoubleRDD dRdd = sparkContext.parallelizeDoubles(list1)
long[] hist = dRdd.histogram(10)
ms.histo(hist)
return ms
})
return rdd.collectAsMap
}
}
我想从 groupByKey 结果的 Iterable 中创建 RDD,以便我可以使用进一步的 Spark 转换。
最佳答案
sparContext 为 null 的原因是,mapValues 中的代码是在工作线程上执行的 - 工作线程上没有可用的 sparContext,它仅在驱动程序上可用。
如果我理解您的代码,我可以告诉您,如果您希望 mapValues 生成排序和索引对,则无需创建。
请记住,该代码的结果如下所示:
RDD(String, V) ->groupByKey-> RDD(String, List(V))
->mapValues-> RDD(String, List(Int,V))
即
key1, List((0,V1), (0,V2)
key1, List((0,V1), (0,V2)
mapValues 独立应用于分组列表内的每个 V。所以计数器将始终为 0。
如果你想用 K 将单个 RDD 转换成多个 RDD,List(V) 比 flatMapValues 会帮助你。仍然存在问题 - 新 rdd 上的流式操作的效率如何 - map 和 reduce 肯定可以工作,但 sortBy 将取决于窗口的大小。
RDD(K, List(V)) -> flatMapValues(x=>x) -> RDD((K, V1), (K, V2) ... )
关于apache-spark - apache Spark - 从 groupByKey 结果的 Iterable 创建 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30857777/