python - Apache Beam python Bigquery 将流式插入更改为批量插入?

标签 python google-bigquery google-cloud-dataflow apache-beam

我正在运行一个 apache beam 数据流作业,它从存储桶中读取数据,执行一些转换并写入 bigquery。 但是记录被插入到流式缓冲区中。

validated_data = (p1
                  | 'Read files from Storage '+url >> beam.io.ReadFromText(url)
                  | 'Validate records ' + url >> beam.Map(data_ingestion.validate, url)\
                  .with_outputs(SUCCESS_TAG_KEY, FAILED_TAG_KEY, main="main")
)
all_data, _, _ = validated_data
success_records = validated_data[SUCCESS_TAG_KEY]
failed_records = validated_data[FAILED_TAG_KEY]


(success_records
 | 'Extracting row from tagged row {}'.format(url) >> beam.Map(lambda row: row['row'])
 | 'Write to BigQuery table for {}'.format(url) >> beam.io.WriteToBigQuery(
            table=data_ingestion.get_table(tmp=TEST, run_date=data_ingestion.run_date),
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
            )
)

实际上,我需要在上面运行之前删除分区,以避免摄取时间分区表的重复记录。

如果我对同一个文件运行此作业超过 1 次,而不截断表格,表格最终将有重复的记录。

并且由于最后的记录在流缓冲区中,删除分区表命令实际上并没有删除分区。 下面是我用来截断表的代码。并且此代码在运行管道之前运行

client = bigquery.Client()
dataset = TABLE_MAP['dataset']
table = TABLE_MAP[sentiment_pipeline][table_type]['table']
table_id = "{}${}".format(table, format_date(run_date, '%Y%m%d'))
table_ref = client.dataset(dataset).table(table_id)
output = client.delete_table(table_ref)

最佳答案

根据 BigQuery 文档,您可能需要等待 30 minutes in order to make a DML statement on a a streaming table ,并且架构更改为 delete/truncate tables might result in data loss for some scenarios . Here您可以尝试一些解决方法来处理流媒体场景中的重复项。

此外,Apache BeamDataflow现在支持 python 的批量插入,因此这可能是避免流限制的好方法。

关于python - Apache Beam python Bigquery 将流式插入更改为批量插入?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54153333/

相关文章:

go - 使用 json key 文件从 golang 登录到 bigquery

sql - 谷歌 bigquery 中的 ST_BUFFER?

google-bigquery - 从 BigQuery 中的一个句子中提取最后 16 个字符

elasticsearch - 从 Google Dataflow 流作业保存到 Elasticsearch

google-cloud-dataflow - 如何运行 Apache Beam 集成测试?

python - 将列表作为行和列索引添加到 pandas 数据框

python - 给定预定义的字符/值对,将字符映射到其值

Python Apache Beam : date value out of range

python - 普温汽车 : How to start an application without GUI error

Python理解递归程序