python - partitionBy 分配分区,但每个分区中的 WHERE

标签 python apache-spark pyspark rdd

使用哈希函数:

balanceLoad = lambda x: bisect.bisect_left(boundary_array, -keyfunc(x))

其中boundary_array为[-64, -10, 35]

下面告诉我将每个元素分配给哪个分区

rdd.partitionBy(numPartitions, balanceLoad)

但是,有没有办法确定/控制它们在每个分区中分配/放置的位置? {1,2,3} 与 {3,2,1}。

例如,当我这样做时:

rdd = CleanRDD(sc.parallelize(range(100), 4).map(lambda x: (x *((-1) ** x) , x)))

sortByKey(rdd, keyfunc=lambda key: key, ascending=False).collect()

每个分区中的元素顺序相反:

[(64, 64), (66, 66), (68, 68), (70, 70), (72, 72), (74, 74), (76, 76), (78, 78), (80, 80), (82, 82), (84, 84), (86, 86), (88, 88), (90, 90), (92, 92), (94, 94), (96, 96), (98, 98), (10, 10), (12, 12), (14, 14), (16, 16), (18, 18), (20, 20), (22, 22), (24, 24), (26, 26), (28, 28), (30, 30), (32, 32), (34, 34), (36, 36), (38, 38), (40, 40), (42, 42), (44, 44), (46, 46), (48, 48), (50, 50), (52, 52), (54, 54), (56, 56), (58, 58), (60, 60), (62, 62), (-35, 35), (-33, 33), (-31, 31), (-29, 29), (-27, 27), (-25, 25), (-23, 23), (-21, 21), (-19, 19), (-17, 17), (-15, 15), (-13, 13), (-11, 11), (-9, 9), (-7, 7), (-5, 5), (-3, 3), (-1, 1), (0, 0), (2, 2), (4, 4), (6, 6), (8, 8), (-99, 99), (-97, 97), (-95, 95), (-93, 93), (-91, 91), (-89, 89), (-87, 87), (-85, 85), (-83, 83), (-81, 81), (-79, 79), (-77, 77), (-75, 75), (-73, 73), (-71, 71), (-69, 69), (-67, 67), (-65, 65), (-63, 63), (-61, 61), (-59, 59), (-57, 57), (-55, 55), (-53, 53), (-51, 51), (-49, 49), (-47, 47), (-45, 45), (-43, 43), (-41, 41), (-39, 39), (-37, 37)]

请注意,三组中每一组中的元素顺序相反。 我该如何纠正这个问题?

最佳答案

确定否,因为洗牌的顺序是不确定的。

您可以控制顺序,但不能作为分区过程的一部分,或者至少不能在 PySpark 中控制。相反,您可以采用类似于 sortByKey 的方法,然后强制执行每个分区的顺序:

def applyOrdering(iter):
    """Takes an itertools.chain object
    and returns iterable with specific ordering"""
    ... 

rdd.partitionBy(numPartitions, balanceLoad).mapPartitions(applyOrdering)

请注意,iter 可能太适合内存,因此您应该增加粒度或使用不需要一次读取所有数据的排序机制。

关于python - partitionBy 分配分区,但每个分区中的 WHERE,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35821317/

相关文章:

python - Spark Streaming - 滑动窗口和不同的使用

java - Java Spark 中 "for"循环中某种类型的计数器

python - PySpark 序列化 EOFError

python - PySpark 新列,从整数列表中选择值

python - 更改 SymPy 多项式中以 p 为模的系数

python - 比较两个名字的相似度并使用神经网络识别重复项

python - 将字符串拆分为单词和标点符号

python - 制作数据长度不足的numpy矩阵

java - 在 Spark 0.9.0 上运行作业会抛出错误

mysql - pyspark mysql jdbc load 调用o23.load时出错没有合适的驱动