google-bigquery - Python Apache 光束 : BigQuery streaming deduplication by row_id

标签 google-bigquery google-cloud-dataflow apache-beam

根据 BigQuery 文档,您可以通过提供 insertId ( https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency ) 来确保数据的一致性。如果未提供,BQ 将尝试根据内部 ID 和尽力而为来确保一致性。

使用 BQ API,您可以使用 row_ids 参数 ( https://google-cloud-python.readthedocs.io/en/latest/bigquery/generated/google.cloud.bigquery.client.Client.insert_rows_json.html#google.cloud.bigquery.client.Client.insert_rows_json ) 来做到这一点,但我找不到 Apache Beam Python SDK 的相同参数。

查看 SDK 我注意到存在“unique_row_id”属性,但我真的不知道如何将我的参数传递给 WriteToBigQuery()

我如何写入 BQ(流式传输)以提供用于重复数据删除的行 ID?

最佳答案

更新:

If you use WriteToBigQuery then it will automatically create and insert a unique row id called insertId for you, which will be inserted to bigquery. It's handled for you, you don't need to worry about it. :)

  1. WriteToBigQuery 是一个PTransform,在它的expand 方法中调用BigQueryWriteFn
  2. BigQueryWriteFn 是一个DoFn,在它的process 方法中调用_flush_batch
  3. _flush_batch 是一个方法,然后调用 BigQueryWrapper.insert_rows 方法
  4. BigQueryWrspper.insert_rows 创建一个 bigquery.TableDataInsertAllRequest.RowsValueListEntry 对象列表,其中包含 insertId 和作为 json 对象的行数据
  5. insertId 是通过调用 unique_row_id 方法生成的,该方法返回一个由 UUID4 与 _ 连接的值和一个自动递增的数字组成的值.

在目前的2.7.0代码中,有这个开心的注释;我也证实了这是真的:) https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1182

# Prepare rows for insertion. Of special note is the row ID that we add to
# each row in order to help BigQuery avoid inserting a row multiple times.
# BigQuery will do a best-effort if unique IDs are provided. This situation
# can happen during retries on failures.

* 不要使用BigQuerySink

至少,不是当前形式,因为它不支持流式传输。我想这可能会改变。


原始(非)答案

好问题,我也看过但找不到特定答案。

Apache Beam 似乎没有使用您链接到的那个 google.cloud.bigquery 客户端 sdk,它有一些内部生成的 api 客户端,但它似乎是最新的。

我看了下源码: insertall 方法在那里 https://github.com/apache/beam/blob/18d2168ee71a1b1b04976717f0f955199bb00961/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py#L476

我还发现提到的 insertid https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py#L1707

因此,如果您可以进行 InsertAll 调用,它将使用 TableDataInsertAllRequest 并传递 RowsValueListEntry

class TableDataInsertAllRequest(_messages.Message):
  """A TableDataInsertAllRequest object.
  Messages:
    RowsValueListEntry: A RowsValueListEntry object.

RowsValueListEntry 消息是 insertid 所在的位置。

这是用于全部插入的 API 文档 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll

我会进一步研究这个,因为我没有看到 WriteToBigQuery() 公开这个。

我怀疑“bigquery 会记住它至少一分钟”是一个非常松散的重复数据删除保证。如果您需要事务,文档建议使用数据存储。否则,您可能需要使用窗口函数运行 SQL 以在运行时进行重复数据删除,或者在 bigquery 上运行其他一些重复数据删除作业。

也许使用 WriteToBigQuery()batch_size 参数,并在数据流中运行组合(或最坏情况下是 GroupByKey)步骤是一种更稳定的方法来删除先前的重复数据写作。

关于google-bigquery - Python Apache 光束 : BigQuery streaming deduplication by row_id,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51080848/

相关文章:

indexing - Bigquery : Find the index at which a specified element is first found within an array

google-bigquery - 云数据流失败并出现错误无法在不同位置处理数据

google-cloud-platform - Google Cloud Dataflow 和 Google Cloud Dataproc 有什么区别?

java - Cloud Dataflow BQ 输出因 TLS 握手错误而挂起作业

java - 如何将进程中的Json对象放入Google数据流中的Bigquery表中

java - 使用 Google Dataflow 在批处理模式下使用 KafkaIO 进行消费

javascript - BigQuery 与 "signed urls"的相似度

google-bigquery - 是否可以链接到 bigquery 控制台中的作业?

google-cloud-platform - GCP 数据流与云函数

python-3.x - 为 WritetoFiles 设置文件名