python - Spark : How to "reduceByKey" when the keys are numpy arrays which are not hashable?

标签 python numpy pyspark rdd

我有一个(键,值)元素的 RDD。键是 NumPy 数组。 NumPy 数组不可散列,当我尝试执行 reduceByKey 操作时,这会导致问题。

有没有办法为 Spark 上下文提供我的手动哈希函数?或者有没有其他方法可以解决这个问题(除了实际“离线”散列数组并仅将散列 key 传递给 Spark)?

这是一个例子:

import numpy as np
from pyspark import SparkContext

sc = SparkContext()

data = np.array([[1,2,3],[4,5,6],[1,2,3],[4,5,6]])
rd = sc.parallelize(data).map(lambda x: (x,np.sum(x))).reduceByKey(lambda x,y: x+y)
rd.collect()

错误是:

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

...

TypeError: unhashable type: 'numpy.ndarray'

最佳答案

最简单的解决方案是将其转换为可散列的对象。例如:

from operator import add

reduced = sc.parallelize(data).map(
    lambda x: (tuple(x), x.sum())
).reduceByKey(add)

并在需要时将其转换回来。

Is there a way to supply the Spark context with my manual hash function

这不是一个简单的问题。整个机制依赖于对象实现了 __hash__ 方法并且 C 扩展不能被猴子修补这一事实。您可以尝试使用调度来覆盖 pyspark.rdd.portable_hash ,但即使您考虑转换成本,我也怀疑这是值得的。

关于python - Spark : How to "reduceByKey" when the keys are numpy arrays which are not hashable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39620767/

相关文章:

python - 将硬编码字符串分配给变量 pytumblr 时有效,但动态传递时不起作用

python - 如何查询给定TRX地址的TRC20代币余额?

python - K-Means:将集群分配给新的数据点

Python numpy groupby 多列

scala - 使用 pyspark 读取 csv 文件时获取格式错误记录的列名称

python - PySpark安装错误

python - 前向链表的问题

python - 将数组从 np.triu_indices 转换为对称矩阵

apache-spark - pyspark数据框添加一列(如果不存在)

python - numpy 中两个二维掩码数组的快速内积