我有一个 ipython 笔记本,其中包含 pyspark 代码,它在我的机器上运行良好,但是当我尝试在另一台机器上运行它时,它会在这一行(rdd3 行)抛出错误:
rdd2 = sc.parallelize(list1)
rdd3 = rdd1.zip(rdd2).map(lambda ((x1,x2,x3,x4), y): (y,x2, x3, x4))
list = rdd3.collect()
我得到的错误是:
ValueError Traceback (most recent call last)
<ipython-input-7-9daab52fc089> in <module>()
---> 16 rdd3 = rdd1.zip(rdd2).map(lambda ((x1,x2,x3,x4), y): (y,x2, x3, x4))
/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in zip(self, other)
1960
1961 if self.getNumPartitions() != other.getNumPartitions():
-> 1962 raise ValueError("Can only zip with RDD which has the same number of partitions")
1963
1964 # There will be an Exception in JVM if there are different number
我不知道为什么这个错误出现在一台机器上而在另一台机器上却没有? ValueError:只能使用具有相同分区数的 RDD 进行压缩
最佳答案
zip
一般来说是一个棘手的操作。它要求两个 RDD 不仅具有相同的分区数量,而且每个分区的元素数量也相同。
排除一些特殊情况,只有当两个 RDD 具有相同的祖先并且之间不存在可能改变元素数量(filter
、flatMap
)的混洗和操作时,才能保证这一点共同的祖先和当前的状态。通常,它仅意味着map
(1 对1)转换。
如果您知道顺序会保留,但分区数量或每个分区的元素数量不同,您可以使用带有索引的 join
:
from operator import itemgetter
def custom_zip(rdd1, rdd2):
index = itemgetter(1)
def prepare(rdd, npart):
return (rdd.zipWithIndex()
.sortByKey(index, numPartitions=npart)
.keys())
npart = rdd1.getNumPartitions() + rdd2.getNumPartitions()
return prepare(rdd1, npart).zip(prepare(rdd2, npart))
rdd1 = sc.parallelize(["a_{}".format(x) for x in range(20)], 5)
rdd2 = sc.parallelize(["b_{}".format(x) for x in range(20)], 10)
rdd1.zip(rdd2).take(5)
## ValueError Traceback (most recent call last)
## ...
## ValueError: Can only zip with RDD which has the same number of partitions
custom_zip(rdd1, rdd2).take(5)
## [('a_0', 'b_0'), ('a_1', 'b_1'), ('a_2', 'b_2'),
## ('a_3', 'b_3'), ('a_4', 'b_4')]
Scala 的等价物是这样的:
def prepare[T: ClassTag](rdd: RDD[T], n: Int) =
rdd.zipWithIndex.sortBy(_._2, true, n).keys
def customZip[T: ClassTag, U: ClassTag](rdd1: RDD[T], rdd2: RDD[U]) = {
val n = rdd1.partitions.size + rdd2.partitions.size
prepare(rdd1, n).zip(prepare(rdd2, n))
}
val rdd1 = sc.parallelize((0 until 20).map(i => s"a_$i"), 5)
val rdd2 = sc.parallelize((0 until 20).map(i => s"b_$i"), 10)
rdd1.zip(rdd2)
// java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
// at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRD
// ...
customZip(rdd1, rdd2).take(5)
// Array[(String, String)] =
// Array((a_0,b_0), (a_1,b_1), (a_2,b_2), (a_3,b_3), (a_4,b_4))
关于python - 只能使用分区数相同的 RDD 进行 zip 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32084368/