python - 只能使用分区数相同的 RDD 进行 zip 错误

标签 python apache-spark ipython pyspark rdd

我有一个 ipython 笔记本,其中包含 pyspark 代码,它在我的机器上运行良好,但是当我尝试在另一台机器上运行它时,它会在这一行(rdd3 行)抛出错误:

rdd2 = sc.parallelize(list1) 
rdd3 = rdd1.zip(rdd2).map(lambda ((x1,x2,x3,x4), y): (y,x2, x3, x4))
list = rdd3.collect()

我得到的错误是:

    ValueError                                Traceback (most recent call last)
    <ipython-input-7-9daab52fc089> in <module>()

    ---> 16 rdd3 = rdd1.zip(rdd2).map(lambda ((x1,x2,x3,x4), y): (y,x2, x3, x4))


    /usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in zip(self, other)
       1960 
       1961         if self.getNumPartitions() != other.getNumPartitions():
    -> 1962             raise ValueError("Can only zip with RDD which has the same number of partitions")
       1963 
       1964         # There will be an Exception in JVM if there are different number

我不知道为什么这个错误出现在一台机器上而在另一台机器上却没有? ValueError:只能使用具有相同分区数的 RDD 进行压缩

最佳答案

zip 一般来说是一个棘手的操作。它要求两个 RDD 不仅具有相同的分区数量,而且每个分区的元素数量也相同。

排除一些特殊情况,只有当两个 RDD 具有相同的祖先并且之间不存在可能改变元素数量(filterflatMap)的混洗和操作时,才能保证这一点共同的祖先和当前的状态。通常,它仅意味着map(1 对1)转换。

如果您知道顺序会保留,但分区数量或每个分区的元素数量不同,您可以使用带有索引的 join :

from operator import itemgetter

def custom_zip(rdd1, rdd2):
    index = itemgetter(1)
    def prepare(rdd, npart):
        return (rdd.zipWithIndex()
                   .sortByKey(index, numPartitions=npart)
                   .keys())

    npart = rdd1.getNumPartitions() + rdd2.getNumPartitions() 

    return prepare(rdd1, npart).zip(prepare(rdd2, npart))     

rdd1 = sc.parallelize(["a_{}".format(x) for x in range(20)], 5)
rdd2 = sc.parallelize(["b_{}".format(x) for x in range(20)], 10)

rdd1.zip(rdd2).take(5)
## ValueError                                Traceback (most recent call last)
## ...
## ValueError: Can only zip with RDD which has the same number of partitions

custom_zip(rdd1, rdd2).take(5)
## [('a_0', 'b_0'), ('a_1', 'b_1'), ('a_2', 'b_2'), 
##     ('a_3', 'b_3'), ('a_4', 'b_4')]

Scala 的等价物是这样的:

def prepare[T: ClassTag](rdd: RDD[T], n: Int) = 
  rdd.zipWithIndex.sortBy(_._2, true, n).keys

def customZip[T: ClassTag, U: ClassTag](rdd1: RDD[T], rdd2: RDD[U]) = {
  val n = rdd1.partitions.size + rdd2.partitions.size
  prepare(rdd1, n).zip(prepare(rdd2, n))
}

val rdd1 = sc.parallelize((0 until 20).map(i => s"a_$i"), 5)
val rdd2 = sc.parallelize((0 until 20).map(i => s"b_$i"), 10)

rdd1.zip(rdd2)

// java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
//  at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRD
//  ...

customZip(rdd1, rdd2).take(5)
// Array[(String, String)] = 
//   Array((a_0,b_0), (a_1,b_1), (a_2,b_2), (a_3,b_3), (a_4,b_4))

关于python - 只能使用分区数相同的 RDD 进行 zip 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32084368/

相关文章:

python - Azure 函数 HTTP 响应类型,使 API 在调用响应时下载 csv

apache-spark - 在 Spark 上配置单元 : java. lang.NoClassDefFoundError: org/apache/hive/spark/client/Job

python - PySpark 计算关联

python - ipython 和 bpython 有什么区别?

python - IPython Notebook 语言环境错误

使用 Django jquery ajax 文件上传

python - 如何在 PyQt5 中通过单击按钮将项目设置为正在编辑

hadoop - 部署 Spark 的最佳方式?

python - 从另一个笔记本以编程方式启动 jupyter notebook

python - 如何将两个二维数据框合并为一个多索引多维 Pandas 数据框?