Python 将 GCS 中的 .json 文件并行读取到 pandas DF 中

标签 python pandas parallel-processing google-cloud-storage python-asyncio

TL;DR asyncio vs multi-processing vs threading vs. 一些其他解决方案 并行化从 GCS 读取文件的 for 循环,然后将这些数据一起附加到 pandas 数据帧中,然后写入 BigQuery...

我想并行执行一个 python 函数,该函数从 GCS 目录读取数十万个小 .json 文件,然后将这些 .jsons 转换为 pandas 数据帧,然后将 pandas 数据帧写入 BigQuery 表。


import gcsfs
import pandas as pd
from my.helpers import get_gcs_file_list
def load_gcs_to_bq(gcs_directory, bq_table):

    # my own function to get list of filenames from GCS directory
    files = get_gcs_file_list(directory=gcs_directory) # 

    # Create new table
    output_df = pd.DataFrame()
    fs = gcsfs.GCSFileSystem() # Google Cloud Storage (GCS) File System (FS)
    counter = 0
    for file in files:

        # read files from GCS
        with, 'r') as f:
            gcs_data = json.loads(
            data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
            this_df = pd.DataFrame(data)
            output_df = output_df.append(this_df)

        # Write to BigQuery for every 5K rows of data
        counter += 1
        if (counter % 5000 == 0):
            pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')
            output_df = pd.DataFrame() # and reset the dataframe

    # Write remaining rows to BigQuery
    pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')


  • grab ['gcs_dir/file1.json', 'gcs_dir/file2.json', ...],GCS中的文件名列表
  • 遍历每个文件名,并且:
    • 从 GCS 读取文件
    • 将数据转换为 pandas DF
    • 附加到主 pandas DF
    • 每 5K 循环,写入 BigQuery(因为随着 DF 变大,追加变得更慢)

我必须在几个 GCS 目录上运行这个函数,每个目录都有大约 500K 个文件。由于读取/写入这么多小文件的瓶颈,单个目录的这个过程将花费大约 24 小时......如果我能使它更加并行以加快速度,那就太好了,因为这似乎是一项任务有助于并行化。

编辑:下面的解决方案很有帮助,但我对从 python 脚本中并行运行特别感兴趣。 Pandas 正在处理一些数据清理,使用 bq load 会抛出错误。有 asyncio还有这个gcloud-aio-storage这两者似乎都可能对这项任务有用,可能是比线程或多处理更好的选择......


与其将并行处理添加到您的 Python 代码,不如考虑并行调用您的 Python 程序多次。这是一个技巧,它更适合在命令行上获取文件列表的程序。因此,为了这篇文章,让我们考虑更改程序中的一行:


# my own function to get list of filenames from GCS directory
files = get_gcs_file_list(directory=gcs_directory) # 


files = sys.argv[1:]  # ok, import sys, too


PROCESSES=100 | xargs -P $PROCESSES your_program

xargs 现在将采用 输出的文件名并并行调用 your_program 多达 100 次,以适应尽可能多的文件每行都可以命名。我相信文件名的数量限于 shell 允许的最大命令大小。如果 100 个进程不足以处理您的所有文件,xargs 将再次(又一次)调用 your_program,直到它从 stdin 读取的所有文件名都得到处理。 xargs 确保同时运行的 your_program 调用不超过 100 个。您可以根据主机可用的资源来改变进程数。

关于Python 将 GCS 中的 .json 文件并行读取到 pandas DF 中,我们在Stack Overflow上找到一个类似的问题:


python - 按名称分组并获取带有 pandas 分层列的数据框

python - Pandas - Vlookup - 搜索列中的重复值

multithreading - knuthBendix 算法不能被 Control.Parallel 并行化?

使用 OpenMP 的 C++ 并行平均矩阵

python - 为什么 int 在 Python 中需要三倍的内存?

python - 如果值为零,则从库存(字典)中删除项目

python - 如何操作上下文管理器的 __exit__ 中的异常?

python - smoothxmpp 线程身份验证

python - 可编辑 QTableView 中的 Pandas df : remove check boxes

c - 并行矩阵计算