java - 如何对每个分区中元素数量不同的两个 RDD 执行类似 zip 的操作?

标签 java apache-spark

我使用的是 Spark 1.1.0。

我有2个RDD firstSamplesecondSample类型 JavaRDD<IndividualBean> 。这些RDD的内容如下:

[
IndividualBean [params={...}], 
IndividualBean [params={...}], 
IndividualBean [params={...}]
]

[
IndividualBean [params={...}], 
IndividualBean [params={...}], 
IndividualBean [params={...}]
]

当我尝试 zip将它们放在一起,我收到以下错误:

Can only zip RDDs with same number of elements in each partition

我猜这是因为我的 RDD 没有相同数量的分区,或者每个分区的元素数量不同。

我想对这些 RDD 执行操作,得到与 zip 相同的结果.

现在,我找到了以下解决方案( totalSize 变量的大小正好是 firstSample.union(secondSample) ):

JavaPairRDD<IndividualBean, IndividualBean> zipped = firstSample.union(secondSample).zipWithIndex().mapToPair(
            new PairFunction<Tuple2<IndividualBean,Long>, Long, IndividualBean>() {
                @Override
                public Tuple2<Long, IndividualBean> call(
                        Tuple2<IndividualBean, Long> tuple) throws Exception {
                    return new Tuple2<Long, IndividualBean>(tuple._2, tuple._1);
                }
    }).groupBy(new Function<Tuple2<Long,IndividualBean>, Long>() {
        @Override
        public Long call(Tuple2<Long, IndividualBean> tuple) throws Exception {
            long index = tuple._1.longValue();
            if(index < totalSize/2){
                return index+totalSize/2;
            }
            return index;
        }
    }).values().mapToPair(new PairFunction<Iterable<Tuple2<Long, IndividualBean>>, IndividualBean, IndividualBean>() {
        @Override
        public Tuple2<IndividualBean, IndividualBean> call(
                Iterable<Tuple2<Long, IndividualBean>> iterable) throws Exception {
            Iterator<Tuple2<Long, IndividualBean>> it = iterable.iterator();
            IndividualBean firstBean = it.next()._2;
            IndividualBean secondBean = it.next()._2;
            return new Tuple2<IndividualBean, IndividualBean>(firstBean, secondBean);
        }
    });

但是它非常昂贵,因为它涉及洗牌。

什么是更好的方法来做到这一点?

最佳答案

Scala 中的解决方案,因为这就是我所有 Spark 编程的方式。

此解决方案的关键是始终保持相同的分区方案,然后将各个分区压缩在一起。为了实现这一目标,解决方案快速而松散地进行采样。特别是,与每个随机选择的点配对的数据点是:

  1. 从同一分区选择
  2. 不是随机选择的(事实上往往来自原始 RDD 中紧邻它的位置)

第一个简化对于解决方案至关重要。可以通过向下面定义的 zipFunc 添加一些代码以重新排序 zip 的一侧来删除第二个。

了解 zipFunc 的作用很重要:我将示例及其补集压缩在一起,而它们的大小甚至不一样。我简单地压缩了两个 RDD 中相应分区的内容,即使它们没有相同数量的样本:当我用完 zip 一侧的样本时,我只是将另一侧的样本删除。

val testRDD = sc.parallelize(1 to 1000, 4)

val firstSample = testRDD.sample(false, 0.4)
val remaining = testRDD.subtract(firstSample)

def zipFunc(l: Iterator[Int], r: Iterator[Int]) : Iterator[(Int,Int)] = {
  val res = new ListBuffer[(Int, Int)]
  // exercise for the reader: suck either l or r into a container before iterating 
  // and consume it in random order to achieve more random pairing if desired
  while (l.hasNext && r.hasNext) {
    res += ((l.next(), r.next()))
  }
  res.iterator
}
// notice the `true` to make sure partitioning is preserved
val pairs:RDD[(Int,Int)] = firstSample.zipPartitions(remaining, true)(zipFunc)

据我所知,这不需要跨分区通信。这确实取决于你的 sample 从各个分区中相当均匀地绘制,根据我的经验,sample() 方法在这方面还不错。

关于java - 如何对每个分区中元素数量不同的两个 RDD 执行类似 zip 的操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26076319/

相关文章:

apache-spark - Spark DataSet 和 RDD 有什么区别

java - 如何通过索引获取列的名称?

java - 在不使用 split Java 的情况下从字符串中添加数字

java - 将字符串转换为日期对象

java - Spark sql 连接 mongo-spark 和 Spark-redshift 连接器的性能问题

java - Apache Spark DataFrame 没有 RDD 分区

python - spark "package.TreeNodeException"错误 python "java.lang.RuntimeException: Couldn' t find pythonUDF"

java - GUI 应用程序中抛出未经检查的异常

java - LWJGL 获取显示尺寸无法正常工作

Java/MySQL : Inserting row into table from Java Web Application (TomCat v7. 0)