我在 Spark 中有一个 RDD(下面的 python 代码):
list1 = [(1,1),(10,100)]
df1 = sc.parallelize(list1)
df1.take(2)
## [(1, 1), (10, 100)]
我想进行自定义排序,根据元组中的两个条目比较这些元组。在 python 中,这种比较的逻辑类似于:
# THRESH is some constant
def compare_tuple(a, b):
center = a[0] - b[0]
dev = a[1] + b[1]
r = center / dev
if r < THRESH:
return -1
else if r == THRESH:
return 0
else:
return 1
我会在 python 中进行自定义排序:
list1.sort(compare_tuple)
如何在 pyspark 中执行此操作?根据 rdd 文档:
https://spark.apache.org/docs/1.4.1/api/python/pyspark.html#pyspark.RDD
sortBy 方法没有自定义排序参数。
我看到 scala 接口(interface) sortBy 支持这个:
https://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.rdd.RDD
但我想在 python spark 中使用它。也欢迎任何解决方法类型的解决方案,谢谢!
最佳答案
您始终可以创建自定义类并实现丰富的比较方法:
对.py
class Pair(tuple): def _cmp(self, other): center = self[0] - other[0] dev = self[1] + other[1] r = center / dev if dev != 0 else center if r < 0: return -1 if r > 0: return 1 return 0 def __lt__(self, other): return self._cmp(other) < 0 def __lte__(self, other): return self._cmp(other) <= 0 def __eq__(self, other): return self._cmp(other) == 0 def __ge__(self, other): return self._cmp(other) >= 0 def __gt__(self, other): return self._cmp(other) > 0
主脚本
from pair import Pair sc.addPyFile("pair.py") rdd = sc.parallelize([(1, 1),(10, 100), (-1, 1), (-1, -0.5)]).map(Pair) rdd.sortBy(lambda x: x).collect() ## [(-1, 1), (-1, -0.5), (1, 1), (10, 100)]
但如果 dev
是一个标准差,那么它不会影响结果,您可以使用普通元组或提取 centers 的
(keyfunc
按身份安全地排序lambda x x:[0]
)。
关于python - python中的spark自定义排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34526786/