python-3.x - 使用Python并行下载多个GCS文件(到内存中)

标签 python-3.x google-cloud-storage python-asyncio aiohttp

我有一个存储桶,里面有很多大文件(每个 500mb)。有时我需要加载多个文件,按名称引用。我一直在使用 blob.download_as_string() 函数逐一下载文件,但速度非常慢,因此我想尝试并行下载它们。

我找到了 gcloud-aio-storage 软件包,但是文档有点稀疏,特别是对于 download 功能。

我更愿意将文件下载/存储在内存中,而不是下载到本地计算机然后上传到脚本。

这是我拼凑起来的,尽管我似乎无法让它发挥作用。我不断收到超时错误。 我做错了什么?

注意:使用 python 3.7 以及所有其他软件包的最新版本。

test_download.py


from gcloud.aio.storage import Storage
import aiohttp 
import asyncio

async def gcs_download(session, bucket_name, file, storage):
    async with session: 
        bucket = storage.get_bucket(bucket_name)
        blob = await bucket.get_blob(file)
        return  await blob.download()
    

async def get_gcsfiles_async(bucket_name, gcs_files):

    async with aiohttp.ClientSession() as session:
        storage = Storage(session=session)
        coros = (gcs_download(session, bucket_name, file, storage) for file in gcs_files)
        return await asyncio.gather(*coros)
        

那么我调用/传入值的方式如下:

import test_download as test
import asyncio

bucket_name = 'my_bucket_name'
project_name = 'my_project_name'  ### Where do I reference this???

gcs_files = ['bucket_folder/some-file-2020-10-06.txt', 
            'bucket_folder/some-file-2020-10-07.txt',
            'bucket_folder/some-file-2020-10-08.txt']

result = asyncio.run(test.get_gcsfiles_async(bucket_name, gcs_files))

任何帮助将不胜感激!

这是相关问题,尽管有两件事需要注意:Google Storage python api download in parallel

  1. 当我从已批准的答案运行代码时,它最终会卡住并且永远不会下载
  2. 它是在 gcloud-aio-storage 软件包发布之前发布的,可能没有利用当前“最佳”方法。

最佳答案

看起来缺少该库的文档,但我可以运行一些东西,并且它正在我的测试中运行。通过查看代码,我发现您不需要使用 blob.download(),因为 it calls无论如何,storage.download()。我基于 usage 下面的脚本部分,处理上传,但可以重写以用于下载。 storage.download() 不会写入文件,因为这是由 storage.download_to_filename() 完成的。您可以查看可用的下载方式here .

async_download.py

import asyncio
from gcloud.aio.auth import Token
from gcloud.aio.storage import Storage

# Used a token from a service account for authentication
sa_token = Token(service_file="../resources/gcs-test-service-account.json", scopes=["https://www.googleapis.com/auth/devstorage.full_control"])

async def async_download(bucket, obj_names):
    async with Storage(token=sa_token) as client:
        tasks = (client.download(bucket, file) for file in obj_names) # Used the built in download method, with required args
        res = await asyncio.gather(*tasks)

    await sa_token.close()
    return res

ma​​in.py

import async_download as dl_test
import asyncio

bucket_name = "my-bucket-name"
obj_names = [
    "text1.txt",
    "text2.txt",
    "text3.txt"
]

res = asyncio.run(dl_test.async_download(bucket_name, obj_names))

print(res)

如果您想使用服务帐户 token ,您可以关注this guide并使用相关的auth scopes 。由于服务帐户是项目方面的,因此不需要指定项目,但我也没有看到 Session 的任何项目名称引用。虽然 GCS 的 GCP Python 库尚不支持并行下载,但有一个 feature request为此开放。目前还没有发布此版本的预计时间。

关于python-3.x - 使用Python并行下载多个GCS文件(到内存中),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70382840/

相关文章:

python - 使用 Python 为 GCS PUT 请求在 GAE 上签名的 URL

python - 异步。动态添加协程到循环

python-3.x - Python计算Grand Canonical Ensemble中的LennardJones 2D交互对相关分布函数

python - str 对象不可调用 python

python-3.x - 将行值与接下来的 2 行进行比较时,无法解决 IndexError

python非阻塞写入csv文件

python - Asyncio 与另一个协程同时运行 Dash (Flask) 服务器

python - 读取 csv 时加快日期时间格式化速度

python - 在 GAE/P 中存储 > 1MB 的文本文件

android - 使用 gsutil 和服务帐户下载应用评论