Python 将 GCS 中的 .json 文件并行读取到 pandas DF 中

标签 python pandas parallel-processing google-cloud-storage python-asyncio

TL;DR asyncio vs multi-processing vs threading vs. 一些其他解决方案 并行化从 GCS 读取文件的 for 循环,然后将这些数据一起附加到 pandas 数据帧中,然后写入 BigQuery...

我想并行执行一个 python 函数,该函数从 GCS 目录读取数十万个小 .json 文件,然后将这些 .jsons 转换为 pandas 数据帧,然后将 pandas 数据帧写入 BigQuery 表。

这是函数的非并行版本:

import gcsfs
import pandas as pd
from my.helpers import get_gcs_file_list
def load_gcs_to_bq(gcs_directory, bq_table):

    # my own function to get list of filenames from GCS directory
    files = get_gcs_file_list(directory=gcs_directory) # 

    # Create new table
    output_df = pd.DataFrame()
    fs = gcsfs.GCSFileSystem() # Google Cloud Storage (GCS) File System (FS)
    counter = 0
    for file in files:

        # read files from GCS
        with fs.open(file, 'r') as f:
            gcs_data = json.loads(f.read())
            data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
            this_df = pd.DataFrame(data)
            output_df = output_df.append(this_df)

        # Write to BigQuery for every 5K rows of data
        counter += 1
        if (counter % 5000 == 0):
            pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')
            output_df = pd.DataFrame() # and reset the dataframe


    # Write remaining rows to BigQuery
    pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')

这个函数很简单:

  • grab ['gcs_dir/file1.json', 'gcs_dir/file2.json', ...],GCS中的文件名列表
  • 遍历每个文件名,并且:
    • 从 GCS 读取文件
    • 将数据转换为 pandas DF
    • 附加到主 pandas DF
    • 每 5K 循环,写入 BigQuery(因为随着 DF 变大,追加变得更慢)

我必须在几个 GCS 目录上运行这个函数,每个目录都有大约 500K 个文件。由于读取/写入这么多小文件的瓶颈,单个目录的这个过程将花费大约 24 小时......如果我能使它更加并行以加快速度,那就太好了,因为这似乎是一项任务有助于并行化。

编辑:下面的解决方案很有帮助,但我对从 python 脚本中并行运行特别感兴趣。 Pandas 正在处理一些数据清理,使用 bq load 会抛出错误。有 asyncio还有这个gcloud-aio-storage这两者似乎都可能对这项任务有用,可能是比线程或多处理更好的选择......

最佳答案

与其将并行处理添加到您的 Python 代码,不如考虑并行调用您的 Python 程序多次。这是一个技巧,它更适合在命令行上获取文件列表的程序。因此,为了这篇文章,让我们考虑更改程序中的一行:

你的线路:

# my own function to get list of filenames from GCS directory
files = get_gcs_file_list(directory=gcs_directory) # 

换行:

files = sys.argv[1:]  # ok, import sys, too

现在,您可以这样调用您的程序:

PROCESSES=100
get_gcs_file_list.py | xargs -P $PROCESSES your_program

xargs 现在将采用 get_gcs_file_list.py 输出的文件名并并行调用 your_program 多达 100 次,以适应尽可能多的文件每行都可以命名。我相信文件名的数量限于 shell 允许的最大命令大小。如果 100 个进程不足以处理您的所有文件,xargs 将再次(又一次)调用 your_program,直到它从 stdin 读取的所有文件名都得到处理。 xargs 确保同时运行的 your_program 调用不超过 100 个。您可以根据主机可用的资源来改变进程数。

关于Python 将 GCS 中的 .json 文件并行读取到 pandas DF 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63045305/

相关文章:

python - 按名称分组并获取带有 pandas 分层列的数据框

python - Pandas - Vlookup - 搜索列中的重复值

multithreading - knuthBendix 算法不能被 Control.Parallel 并行化?

使用 OpenMP 的 C++ 并行平均矩阵

python - 为什么 int 在 Python 中需要三倍的内存?

python - 如果值为零,则从库存(字典)中删除项目

python - 如何操作上下文管理器的 __exit__ 中的异常?

python - smoothxmpp 线程身份验证

python - 可编辑 QTableView 中的 Pandas df : remove check boxes

c - 并行矩阵计算