google-bigquery - 在 BigQuery 接收器中进行一次处理的上下文中,重新洗牌是什么意思?

标签 google-bigquery apache-beam dataflow

我正在阅读 article关于由某些 Dataflow 源和接收器实现的一次性处理,我在理解 BigQuery 接收器上的示例时遇到了麻烦。
从文章

Generating a random UUID is a non-deterministic operation, so we must add a reshuffle before we insert into BigQuery. Once that is done, any retries by Cloud Dataflow will always use the same UUID that was shuffled. Duplicate attempts to insert into BigQuery will always have the same insert id, so BigQuery is able to filter them


// Apply a unique identifier to each record
c
 .apply(new DoFn<> {
  @ProcessElement
  public void processElement(ProcessContext context) {
   String uniqueId = UUID.randomUUID().toString();
   context.output(KV.of(ThreadLocalRandom.current().nextInt(0, 50),
                                     new RecordWithId(context.element(), uniqueId)));
 }
})
// Reshuffle the data so that the applied identifiers are stable and will not change.
.apply(Reshuffle.of<Integer, RecordWithId>of())
// Stream records into BigQuery with unique ids for deduplication.
.apply(ParDo.of(new DoFn<..> {
   @ProcessElement
   public void processElement(ProcessContext context) {
     insertIntoBigQuery(context.element().record(), context.element.id());
   }
 });

什么改组 意思是如何防止在后续重试中为相同的插入生成不同的 UUID?

最佳答案

Reshuffle 以不同的方式对数据进行分组。然而,在这里它被用于它的副作用:检查点和重复数据删除。

如果没有重新洗牌,如果同一个任务生成 UUID 并将数据插入到 BigQuery,则存在工作器重新启动的风险,新工作器会生成新的 UUID 并将不同的行发送到 BigQuery,从而导致重复行。

Reshuffle 操作将 UUID 生成和 BigQuery 插入分为两个步骤,并在它们之间插入检查点和重复数据删除。

  • 首先,生成 UUID 并发送到重新洗牌。如果 UUID 生成工作程序重新启动,则没关系,因为重新洗牌会删除重复的行,从而消除失败/重新启动的工作程序中的数据。
  • 生成的 UUID 由 shuffle 操作检查点。
  • BigQuery 插入工作程序使用检查点 UUID,因此即使它重新启动 - 它也会向 BigQuery 发送完全相同的数据。
  • BigQuery 使用这些 UUID 对数据进行重复数据删除,因此在 BigQuery 中消除了来自重新启动的插入工作器的重复数据。
  • 关于google-bigquery - 在 BigQuery 接收器中进行一次处理的上下文中,重新洗牌是什么意思?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52520614/

    相关文章:

    sql - Google BigQuery SQL 用于对各个列求和

    google-bigquery - ARRAY_AGG(STRUCT(x,y,z)) 等效于 Bigquery 遗留 SQL

    google-bigquery - 如何在 GitHub-Archive 中获取具有最大星数的 java 存储库

    google-bigquery - 除了日期分区,Google BigQuery 是否支持?

    Azure 数据工厂集成运行时将无法启动

    cookies - 在 spy 网站上进行反 spy

    python - 节流光束应用中的一个步骤

    python - 云数据流 Python : Failed to install packages: failed to install workflow

    java - Apache Beam 中 DoFn 的线程同步

    python - 为可移植运行者构建 apache beam sdk 线束 - 名称问题