python - PySpark takeOrdered 多个字段(升序和降序)

标签 python sorting apache-spark pyspark rdd

pyspark.RDD 中的 takeOrdered 方法从按升序排列的 RDD 中获取 N 个元素,或按此处所述的可选键函数指定 pyspark.RDD.takeOrdered .该示例显示了以下带有一键的代码:

>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
[10, 9, 7, 6, 5, 4]

是否也可以定义更多的键,例如x、y、z 用于具有 3 列的数据?

键的顺序应该不同,例如 x= asc、y= desc、z=asc。这意味着如果两行的第一个值 x 相等,则第二个值 y 应该按降序使用。

最佳答案

对于数字你可以这样写:

n = 1
rdd = sc.parallelize([
    (-1, 99, 1), (-1, -99, -1), (5, 3, 8), (-1, 99, -1)
])

rdd.takeOrdered(n, lambda x: (x[0], -x[1], x[2]))
# [(-1, 99, -1)]

对于其他对象你可以定义某种类型的记录类型并定义你自己的一组丰富的比较方法:

class XYZ(object):
    slots = ["x", "y", "z"]

    def __init__(self, x, y, z):
        self.x, self.y, self.z = x, y, z

    def __eq__(self, other):
        if not isinstance(other, XYZ):
            return False
        return self.x == other.x and self.y == other.y and self.z == other.z

    def __lt__(self, other):
        if not isinstance(other, XYZ):
            raise ValueError(
                "'<' not supported between instances of 'XYZ' and '{0}'".format(
                    type(other)
            ))
        if self.x == other.x:
            if self.y == other.y:
                return self.z < other.z
            else:
                return self.y > other.y
        else:
            return self.x < other.x

    def __repr__(self):
        return "XYZ({}, {}, {})".format(self.x, self.y, self.z)

    @classmethod
    def from_tuple(cls, xyz):
        x, y, z = xyz
        return cls(x, y, z)

然后:

from xyz import XYZ

rdd.map(XYZ.from_tuple).takeOrdered(n)
# [XYZ(-1, 99, -1)]

在实践中只需使用 SQL:

from pyspark.sql.functions import asc, desc

rdd.toDF(["x", "y", "z"]).orderBy(asc("x"), desc("y"), asc("z")).take(n)
# [Row(x=-1, y=99, z=-1)]

关于python - PySpark takeOrdered 多个字段(升序和降序),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48703081/

相关文章:

python - 按索引和列排序

arrays - 对 varchar2 数组进行排序 (apex_t_varchar2)

apache-spark - Spark MLlib : Difference between implicitTrain and explicitTrain

scala - 按字段 Scala 中的值过滤 rdd 行

hadoop - Spark 应用程序报告内存不足的 Oozie 工作流

python - skimage.io.imread 与 cv2.imread

python - matplotlib 中的 100% 堆积面积/直方图,X 轴上有日期

python - 将外键选择限制为 ManyToMany 字段选择

python - 如何将函数名作为字符串获取?

c - 字符串 C 中字母按字母顺序排序