TL;DR asyncio
vs multi-processing
vs threading
vs. 一些其他解决方案
并行化从 GCS 读取文件的 for 循环,然后将这些数据一起附加到 pandas 数据帧中,然后写入 BigQuery...
我想并行执行一个 python 函数,该函数从 GCS 目录读取数十万个小 .json 文件,然后将这些 .jsons 转换为 pandas 数据帧,然后将 pandas 数据帧写入 BigQuery 表。
这是函数的非并行版本:
import gcsfs
import pandas as pd
from my.helpers import get_gcs_file_list
def load_gcs_to_bq(gcs_directory, bq_table):
# my own function to get list of filenames from GCS directory
files = get_gcs_file_list(directory=gcs_directory) #
# Create new table
output_df = pd.DataFrame()
fs = gcsfs.GCSFileSystem() # Google Cloud Storage (GCS) File System (FS)
counter = 0
for file in files:
# read files from GCS
with fs.open(file, 'r') as f:
gcs_data = json.loads(f.read())
data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
this_df = pd.DataFrame(data)
output_df = output_df.append(this_df)
# Write to BigQuery for every 5K rows of data
counter += 1
if (counter % 5000 == 0):
pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')
output_df = pd.DataFrame() # and reset the dataframe
# Write remaining rows to BigQuery
pd.DataFrame.to_gbq(output_df, bq_table, project_id=my_id, if_exists='append')
这个函数很简单:
- grab
['gcs_dir/file1.json', 'gcs_dir/file2.json', ...]
,GCS中的文件名列表 - 遍历每个文件名,并且:
- 从 GCS 读取文件
- 将数据转换为 pandas DF
- 附加到主 pandas DF
- 每 5K 循环,写入 BigQuery(因为随着 DF 变大,追加变得更慢)
我必须在几个 GCS 目录上运行这个函数,每个目录都有大约 500K 个文件。由于读取/写入这么多小文件的瓶颈,单个目录的这个过程将花费大约 24 小时......如果我能使它更加并行以加快速度,那就太好了,因为这似乎是一项任务有助于并行化。
编辑:下面的解决方案很有帮助,但我对从 python 脚本中并行运行特别感兴趣。 Pandas 正在处理一些数据清理,使用 bq load
会抛出错误。有 asyncio还有这个gcloud-aio-storage这两者似乎都可能对这项任务有用,可能是比线程或多处理更好的选择......
最佳答案
与其将并行处理添加到您的 Python 代码,不如考虑并行调用您的 Python 程序多次。这是一个技巧,它更适合在命令行上获取文件列表的程序。因此,为了这篇文章,让我们考虑更改程序中的一行:
你的线路:
# my own function to get list of filenames from GCS directory
files = get_gcs_file_list(directory=gcs_directory) #
换行:
files = sys.argv[1:] # ok, import sys, too
现在,您可以这样调用您的程序:
PROCESSES=100
get_gcs_file_list.py | xargs -P $PROCESSES your_program
xargs
现在将采用 get_gcs_file_list.py
输出的文件名并并行调用 your_program
多达 100 次,以适应尽可能多的文件每行都可以命名。我相信文件名的数量限于 shell 允许的最大命令大小。如果 100 个进程不足以处理您的所有文件,xargs 将再次(又一次)调用 your_program
,直到它从 stdin 读取的所有文件名都得到处理。 xargs
确保同时运行的 your_program
调用不超过 100 个。您可以根据主机可用的资源来改变进程数。
关于Python 将 GCS 中的 .json 文件并行读取到 pandas DF 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63045305/