我有这样一个RDD:
[('a', ('H', 1)), ('b', (('H', 41), ('S', 1)))]
因此键值可以是单个 tuple
或 tuple
的 tuple
作为值。这来自 reduceByKey
。
我需要执行一个简单的操作:将 S 的计数除以 (H + S) 的计数。
当 S 不存在时,就像第一项的情况一样,我将不得不返回 0。
问题是将第一种情况(单个 tuple
)与第二种情况(tuple
of two tuple
)隔离开,这样我就知道如何操作 map
。
我将如何进行?
最佳答案
一般来说,在上游修复这个问题更有意义,但您可以尝试这样的事情:
from operator import truediv
def f(vs):
try:
d = dict(vs)
except ValueError:
d = dict([vs])
s = sum(d.values())
return truediv(d.get("S", 0), s) if s else float('nan')
rdd = sc.parallelize([('a', ('H', 1)), ('b', (('H', 41), ('S', 1)))])
rdd.mapValues(f).collect()
## [('a', 0.0), ('b', 0.023809523809523808)]
或者,如果你不介意外部依赖,你可以尝试使用multipledispatch
:
from multipledispatch import dispatch
@dispatch(tuple, tuple)
def f(h, s):
try:
return truediv(s[1], h[1] + s[1])
except ZeroDivisionError:
return float('nan')
@dispatch(str, int)
def f(x, y):
return 0.0
rdd.mapValues(lambda args: f(*args)).collect()
## [('a', 0.0), ('b', 0.023809523809523808)]
关于python - Pyspark:基于类型的值操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35784548/