python - python中的spark自定义排序

标签 python apache-spark pyspark

我在 Spark 中有一个 RDD(下面的 python 代码):

list1 = [(1,1),(10,100)]
df1 = sc.parallelize(list1)
df1.take(2)
## [(1, 1), (10, 100)]

我想进行自定义排序,根据元组中的两个条目比较这些元组。在 python 中,这种比较的逻辑类似于:

# THRESH is some constant
def compare_tuple(a, b):
    center = a[0] - b[0]
    dev = a[1] + b[1]
    r = center / dev
    if r < THRESH:
        return -1
    else if r == THRESH:
        return 0
    else:
        return 1

我会在 python 中进行自定义排序:

list1.sort(compare_tuple)

如何在 pyspark 中执行此操作?根据 rdd 文档:

https://spark.apache.org/docs/1.4.1/api/python/pyspark.html#pyspark.RDD

sortBy 方法没有自定义排序参数。

我看到 scala 接口(interface) sortBy 支持这个:

https://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.rdd.RDD

但我想在 python spark 中使用它。也欢迎任何解决方法类型的解决方案,谢谢!

最佳答案

您始终可以创建自定义类并实现丰富的比较方法:

  • 对.py

    class Pair(tuple):
        def _cmp(self, other):
            center = self[0] - other[0]
            dev = self[1] + other[1]
            r = center / dev if dev != 0 else center
            if r < 0:
                return -1
            if r >  0:
                return 1
            return 0
    
        def __lt__(self, other):
            return self._cmp(other) < 0
    
        def __lte__(self, other):
            return self._cmp(other) <= 0
    
        def __eq__(self, other):
            return self._cmp(other) == 0
    
        def __ge__(self, other):
            return self._cmp(other) >= 0
    
        def __gt__(self, other):
            return self._cmp(other) > 0
    
  • 主脚本

    from pair import Pair
    
    sc.addPyFile("pair.py")
    
    rdd = sc.parallelize([(1, 1),(10, 100), (-1, 1), (-1, -0.5)]).map(Pair)
    rdd.sortBy(lambda x: x).collect()
    ## [(-1, 1), (-1, -0.5), (1, 1), (10, 100)]
    

但如果 dev 是一个标准差,那么它不会影响结果,您可以使用普通元组或提取 centers 的 keyfunc 按身份安全地排序(lambda x x:[0])。

关于python - python中的spark自定义排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34526786/

相关文章:

python - 只能使用分区数相同的 RDD 进行 zip 错误

python - 使用 toPandas() 方法将 spark 数据帧转换为 Pandas 数据帧时会发生什么

python - 打印乐谱 - Pygame

python - 让 python os.chdir 跟随 vim autochdir?

python - Numpy:从较小的矩阵创建矩阵

scala - 如何在spark中为diff文件名调用单独的逻辑

apache-spark - Databricks 连接失败,方案 : abfss 没有文件系统

数据框 API 与 Spark.sql

python - 如何在 PySpark 的 UDF 中返回 "Tuple type"?

python - 如何在 borb 2.1.16 中设置页面布局边距