python - Pyspark:基于类型的值操作

标签 python apache-spark pyspark rdd

我有这样一个RDD:

[('a', ('H', 1)), ('b', (('H', 41), ('S', 1)))]

因此键值可以是单个 tupletupletuple 作为值。这来自 reduceByKey。 我需要执行一个简单的操作:将 S 的计数除以 (H + S) 的计数。 当 S 不存在时,就像第一项的情况一样,我将不得不返回 0。 问题是将第一种情况(单个 tuple)与第二种情况(tuple of two tuple)隔离开,这样我就知道如何操作 map

我将如何进行?

最佳答案

一般来说,在上游修复这个问题更有意义,但您可以尝试这样的事情:

from operator import truediv

def f(vs):
    try:
        d = dict(vs)
    except ValueError:
        d = dict([vs])

    s = sum(d.values())
    return truediv(d.get("S", 0), s) if s else float('nan')

rdd = sc.parallelize([('a', ('H', 1)), ('b', (('H', 41), ('S', 1)))])
rdd.mapValues(f).collect()

## [('a', 0.0), ('b', 0.023809523809523808)]

或者,如果你不介意外部依赖,你可以尝试使用multipledispatch :

from multipledispatch import dispatch

@dispatch(tuple, tuple)
def f(h, s):
    try:
        return truediv(s[1], h[1] + s[1])
    except ZeroDivisionError:
        return float('nan')

@dispatch(str, int)
def f(x, y):
    return 0.0

rdd.mapValues(lambda args: f(*args)).collect()
## [('a', 0.0), ('b', 0.023809523809523808)]

关于python - Pyspark:基于类型的值操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35784548/

相关文章:

python-3.x - 使用 Jupyter Notebook 中的 PySpark 从 AWS EMR 集群读取存储在 AWS S3 中的解压缩 Shapefile

android - 使用Android的PyGame子集进行音频

docker - 在 Kubernetes 上运行 Spark 访问 kerberized Hadoop 集群时,如何解决执行器上的 "SIMPLE authentication is not enabled"错误?

scala - 使用 foreach 行在数据框中捕获和写入字符串

python - 使用 Pyspark 读取 S3 上的随机文件样本

azure - 使用 Databricks PySpark 解压缩大文件

python正则表达式选择整个单词

python - Django - 从 geraldo 报告中删除无关的 unicode 符号

python - Scrapy 显示 notImplementedError 我不知道为什么

scala - 使用 Scala 转换 LabeledPoint 中 Vector 的 RDD - Apache Spark 中的 MLLib