根据 BigQuery 文档,您可以通过提供 insertId
( https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency ) 来确保数据的一致性。如果未提供,BQ 将尝试根据内部 ID 和尽力而为来确保一致性。
使用 BQ API,您可以使用 row_ids
参数 ( https://google-cloud-python.readthedocs.io/en/latest/bigquery/generated/google.cloud.bigquery.client.Client.insert_rows_json.html#google.cloud.bigquery.client.Client.insert_rows_json ) 来做到这一点,但我找不到 Apache Beam Python SDK 的相同参数。
查看 SDK 我注意到存在“unique_row_id”属性,但我真的不知道如何将我的参数传递给 WriteToBigQuery()
我如何写入 BQ(流式传输)以提供用于重复数据删除的行 ID?
最佳答案
更新:
If you use
WriteToBigQuery
then it will automatically create and insert a unique row id calledinsertId
for you, which will be inserted to bigquery. It's handled for you, you don't need to worry about it. :)
WriteToBigQuery
是一个PTransform
,在它的expand
方法中调用BigQueryWriteFn
BigQueryWriteFn
是一个DoFn
,在它的process
方法中调用_flush_batch
_flush_batch
是一个方法,然后调用BigQueryWrapper.insert_rows
方法BigQueryWrspper.insert_rows
创建一个bigquery.TableDataInsertAllRequest.RowsValueListEntry
对象列表,其中包含insertId
和作为 json 对象的行数据insertId
是通过调用unique_row_id
方法生成的,该方法返回一个由 UUID4 与_
连接的值和一个自动递增的数字组成的值.
在目前的2.7.0代码中,有这个开心的注释;我也证实了这是真的:) https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1182
# Prepare rows for insertion. Of special note is the row ID that we add to # each row in order to help BigQuery avoid inserting a row multiple times. # BigQuery will do a best-effort if unique IDs are provided. This situation # can happen during retries on failures.
* 不要使用BigQuerySink
至少,不是当前形式,因为它不支持流式传输。我想这可能会改变。
原始(非)答案
好问题,我也看过但找不到特定答案。
Apache Beam 似乎没有使用您链接到的那个 google.cloud.bigquery 客户端 sdk,它有一些内部生成的 api 客户端,但它似乎是最新的。
我看了下源码:
insertall
方法在那里 https://github.com/apache/beam/blob/18d2168ee71a1b1b04976717f0f955199bb00961/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py#L476
我还发现提到的 insertid
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py#L1707
因此,如果您可以进行 InsertAll 调用,它将使用 TableDataInsertAllRequest
并传递 RowsValueListEntry
class TableDataInsertAllRequest(_messages.Message):
"""A TableDataInsertAllRequest object.
Messages:
RowsValueListEntry: A RowsValueListEntry object.
RowsValueListEntry
消息是 insertid 所在的位置。
这是用于全部插入的 API 文档 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
我会进一步研究这个,因为我没有看到 WriteToBigQuery()
公开这个。
我怀疑“bigquery 会记住它至少一分钟”是一个非常松散的重复数据删除保证。如果您需要事务,文档建议使用数据存储。否则,您可能需要使用窗口函数运行 SQL 以在运行时进行重复数据删除,或者在 bigquery 上运行其他一些重复数据删除作业。
也许使用 WriteToBigQuery()
的 batch_size
参数,并在数据流中运行组合(或最坏情况下是 GroupByKey)步骤是一种更稳定的方法来删除先前的重复数据写作。
关于google-bigquery - Python Apache 光束 : BigQuery streaming deduplication by row_id,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51080848/