python - 并行化 GZip 文件处理 Spark

标签 python hadoop apache-spark gzip pyspark

我有大量需要转换为 Parquet 的 GZip 文件。由于 GZip 的压缩特性,这不能针对一个文件并行化。

但是,既然我有很多,有没有比较简单的方法让每个节点做一部分文件呢?这些文件在 HDFS 上。我假设我不能使用 RDD 基础设施来编写 Parquet 文件,因为这一切都是在驱动程序上完成的,而不是在节点本身上完成的。

我可以并行化文件名列表,编写一个处理本地 Parquets 并将它们保存回 HDFS 的函数。我不知道该怎么做。我觉得我错过了一些明显的东西,谢谢!

这被标记为重复问题,但事实并非如此。我完全了解 Spark 能够将它们作为 RDD 读取而不必担心压缩,我的问题更多是关于如何并行地将这些文件转换为结构化 Parquet 文件。

如果我知道如何在没有 Spark 本身的情况下与 Parquet 文件交互,我可以这样做:

def convert_gzip_to_parquet(file_from, file_to):
    gzipped_csv = read_gzip_file(file_from)
    write_csv_to_parquet_on_hdfs(file_to)

# Filename RDD contains tuples with file_from and file_to
filenameRDD.map(lambda x: convert_gzip_to_parquet(x[0], x[1]))

这将允许我将其并行化,但我不知道如何从本地环境与 HDFS 和 Parquet 交互。我想知道:

1)怎么做

或者..

2) 如何使用 PySpark 以不同的方式并行化这个过程

最佳答案

我会建议以下两种方法之一(在实践中我发现第一种方法在性能方面提供更好的结果)。

将每个 Zip 文件写入一个单独的 Parquet 文件

在这里你可以使用 pyarrow 将 Parquet-File 写入 HDFS:

def convert_gzip_to_parquet(file_from, file_to):
    gzipped_csv = read_gzip_file(file_from)
    pyarrow_table = to_pyarrow_table(gzipped_csv)
    hdfs_client = pyarrow.HdfsClient()
    with hdfs_client.open(file_to, "wb") as f:
        pyarrow.parquet.write_table(pyarrow_table, f)

# Filename RDD contains tuples with file_from and file_to
filenameRDD.map(lambda x: convert_gzip_to_parquet(x[0], x[1]))

获取pyarrow.Table对象有两种方式:

  • 要么从 pandas DataFrame 获取它(在这种情况下,您也可以使用 pandas 的 read_csv() 函数):pyarrow_table = pyarrow.Table.from_pandas(pandas_df)

  • 或使用 pyarrow.Table.from_arrays 手动构建它

要使 pyarrow 与 HDFS 一起工作,需要正确设置多个环境变量,请参阅 here

将所有 Zip 文件中的行连接到一个 Parquet 文件中

def get_rows_from_gzip(file_from):
    rows = read_gzip_file(file_from)
    return rows

# read the rows of each zip file into a Row object
rows_rdd = filenameRDD.map(lambda x: get_rows_from_gzip(x[0]))

# flatten list of lists
rows_rdd = rows_rdd.flatMap(lambda x: x)

# convert to DataFrame and write to Parquet
df = spark_session.create_DataFrame(rows_rdd)
df.write.parquet(file_to)

如果您事先知道数据的架构,将架构对象传递给 create_DataFrame 将加速 DataFrame 的创建。

关于python - 并行化 GZip 文件处理 Spark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35412014/

相关文章:

mysql - 使用 sqoop 将 mysql 查询导入到 hbase

python - 如何在 TensorFlow 中无周期边界滚动?

python - 尝试访问 multiprocessing.Pool 工作进程中的持久数据时出现不稳定的运行时异常

sql - 将两列合并为一列并格式化内容以在 Hive 中形成准确的日期时间格式?

java - 由于leveldbjni的unsatisfiedLinkError而无法启动失败的namenode

apache-spark - 如何在Spark Shell中使用TwitterUtils?

python - 将 csv 字典列转换为行 pyspark

apache-spark - 如何在 PySpark 应用程序中读取和写入 Google Cloud Bigtable 中的数据?

python - 如何加快csv导入sql的速度?

python - 消除趋势或过滤锯齿信号 (Python)