python - PySpark(Python 2.7): How to flatten values after reduce

标签 python python-2.7 hadoop apache-spark pyspark

我正在使用带有自定义分隔符的 SparkContext.newAPIHadoopFile 读取多行记录文件。反正我已经做好了准备,减少了我的数据。但现在我想再次将 key 添加到每一行(条目),然后将其写入 Apache Parquet 文件,然后将其存储到 HDFS 中。

这个图应该可以解释我的问题。我正在寻找的是红色箭头,例如写入文件之前的最后一次转换。任何想法?我尝试了 flatMap,但时间戳和浮点值导致了不同的记录。

PySpark chain

Python 脚本可以是 downloaded here和样本text file here 。我在 Jupyter Notebook 中使用 Python 代码。

最佳答案

简单的列表理解应该足够了:

from datetime import datetime


def flatten(kvs):
    """
    >>> kvs = ("852-YF-008", [
    ... (datetime(2016, 5, 10, 0, 0), 0.0),
    ... (datetime(2016, 5, 9, 23, 59), 0.0)])
    >>> flat = flatten(kvs)
    >>> len(flat)
    2
    >>> flat[0]
    ('852-YF-008', datetime.datetime(2016, 5, 10, 0, 0), 0.0)
    """
    k, vs = kvs
    return [(k, v1, v2) for v1, v2 in vs]

在 Python 2.7 中,您还可以使用带有元组参数解包的 lambda 表达式,但这不可移植并且通常不鼓励:

lambda (k, vs): [(k, v1, v2) for v1, v2 in vs]

版本无关:

lambda kvs: [(kvs[0], v1, v2) for v1, v2 in kvs[1]]

编辑:

如果您需要的只是写入分区数据,则直接转换为 Parquet,无需 reduceByKey:

(sheet
    .flatMap(process)
    .map(lambda x: (x[0], ) + x[1])
    .toDF(["key", "datettime", "value"])
    .write
    .partitionBy("key")
    .parquet(output_path))

关于python - PySpark(Python 2.7): How to flatten values after reduce,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38140710/

相关文章:

python - 如何从非结构化文本创建 Python 字典?

python - 使用python和gdal对点数据进行IDW插值

hadoop - -Dpig.additional.jars 包含 HDFS 和本地文件系统上的文件

hadoop - 如何在Spark流中运行并发事件作业以及执行者之间的公平任务调度

python - 如何将整数值传递给 python 的 subprocess.call 方法?

javascript - 将python字典输出转换为流程图

python 2.7 存在于字典中

unix - 从UNIX中的固定宽度文件中提取特定的列

Python3找到最后一次出现的字符串然后写入

Python 在引号之间的文件中查找文本