python - 大查询存储。 Python。并行读取多个流(多处理)

标签 python google-cloud-platform google-bigquery multiprocessing google-cloud-storage

我正在尝试使用 BALANCED ShardingStrategy 获得超过 1 个流和 python 多处理库来并行读取流。

但是,当并行读取流时,返回相同的行号和数据。因为,如果我理解正确的话,在开始读取和完成之前没有数据分配给任何流,所以两个并行流尝试读取相同的数据,结果永远不会读取一部分数据。

使用 LIQUID 策略,我们可以从一个流中读取所有数据,该流不能拆分。

根据文档,可以使用 BALANCED 并行读取多个流。但是,我不知道如何并行读取并将不同的数据分配给每个流

我有以下玩具代码:

import pandas as pd
from google.cloud import bigquery_storage_v1beta1
import os
import google.auth
from multiprocessing import Pool
import multiprocessing

os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='key.json'
credentials, your_project_id = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
bq_storage_client = bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials)

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "ethereum_blockchain"
table_ref.table_id = "contracts"

parent = "projects/{}".format(your_project_id)
session = bq_storage_client.create_read_session(
    table_ref,
    parent,
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)

def read_rows(stream_position, session=session):
    reader = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[stream_position]), timeout=100000).to_arrow(session).to_pandas()
    return reader

if __name__ ==  '__main__': 
    p = Pool(2)
    output = p.map(read_rows,([i for i in range(0,2)]))
    print(output)

需要帮助才能并行读取多个流。 可能有一种方法可以在读取开始之前将数据分配给流。任何代码示例或解释和提示将不胜感激

最佳答案

对于部分回答我深表歉意,但它不适合发表评论。

LIQUID 或 BALANCED 只影响数据如何分配给流,而不影响数据到达多个流的事实(参见 here)。

当我使用此 read_rows 函数运行您的代码变体时,我看到两个流的第一行有不同的数据,因此我无法通过任何一种着色策略在该数据集上看到相同的数据来重现您的问题。

def read_rows(stream_position, session=session):
    reader = bq_storage_client.read_rows(
      bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[stream_position]),timeout=100000)

    for row in reader.rows(session):
      print(row)
      break

我在 Linux 计算引擎实例上运行这段代码。

不过,我确实担心您在 map 调用中要求的输出会非常大。

关于python - 大查询存储。 Python。并行读取多个流(多处理),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58100151/

相关文章:

google-bigquery - 如何在 BigQuery 中执行 Pandas 列转换?

python - 如何在不导入 RAM 的情况下使用 numpy 文件?

google-bigquery - 直接上传到 BigQuery 比上传到 Cloud Storage 更快吗?

google-bigquery - BigQuery - 加载具有空值的 JSON 字段

python - BigQuery 类型错误 : to_pandas() got an unexpected keyword argument 'timestamp_as_object'

google-cloud-platform - 在现有大查询表上添加集群

python - 带链接轴的 PyQtGraph 网格

python - np.float 不匹配 np.float32 和 np.float64

python - "Can' t 连接到 HTTPS URL,因为 SSL 模块不可用。”

node.js - 为 GKE kubernetes 集群选择 Node 大小