我有一个(键,值)元素的 RDD。键是 NumPy 数组。 NumPy 数组不可散列,当我尝试执行 reduceByKey
操作时,这会导致问题。
有没有办法为 Spark 上下文提供我的手动哈希函数?或者有没有其他方法可以解决这个问题(除了实际“离线”散列数组并仅将散列 key 传递给 Spark)?
这是一个例子:
import numpy as np
from pyspark import SparkContext
sc = SparkContext()
data = np.array([[1,2,3],[4,5,6],[1,2,3],[4,5,6]])
rd = sc.parallelize(data).map(lambda x: (x,np.sum(x))).reduceByKey(lambda x,y: x+y)
rd.collect()
错误是:
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
...
TypeError: unhashable type: 'numpy.ndarray'
最佳答案
最简单的解决方案是将其转换为可散列的对象。例如:
from operator import add
reduced = sc.parallelize(data).map(
lambda x: (tuple(x), x.sum())
).reduceByKey(add)
并在需要时将其转换回来。
Is there a way to supply the Spark context with my manual hash function
这不是一个简单的问题。整个机制依赖于对象实现了 __hash__ 方法并且 C 扩展不能被猴子修补这一事实。您可以尝试使用调度来覆盖 pyspark.rdd.portable_hash ,但即使您考虑转换成本,我也怀疑这是值得的。
关于python - Spark : How to "reduceByKey" when the keys are numpy arrays which are not hashable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39620767/