google-bigquery - 将数据流式传输到 Google BigQuery 表 : problems using InsertId to De-Duplicate Records

标签 google-bigquery apache-camel spring-jms spring-camel

我们使用 Camel BigQuery API(版本 2.20)将记录从 ActiveMQ 服务器(版本 5.14.3)上的消息队列流式传输到 Google BigQuery 表中.

我们已经在主站点上运行的 Spring 框架中实现并部署了流机制作为 XML 路由定义,并且它似乎运行良好。

<?xml version="1.0" encoding="UTF-8"?>
<beans
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://www.springframework.org/schema/beans"
    xmlns:beans="http://www.springframework.org/schema/beans"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans 
        ./spring-beans.xsd
        http://camel.apache.org/schema/spring
        ./camel-spring.xsd">

    <!--
    # ==========================================================================
    # ActiveMQ JMS Bean Definition
    # ==========================================================================
    -->
    <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="nio://192.168.10.10:61616?jms.useAsyncSend=true" />
                <property name="userName"  value="MyAmqUserName" />
                <property name="password"  value="MyAmqPassword" />
            </bean>
        </property>
    </bean>

    <!--
    # ==========================================================================
    # GoogleBigQueryComponent
    # https://github.com/apache/camel/tree/master/components/camel-google-bigquery
    # ==========================================================================
    -->
    <bean id="gcp" class="org.apache.camel.component.google.bigquery.GoogleBigQueryComponent">
        <property name="connectionFactory">
            <bean class="org.apache.camel.component.google.bigquery.GoogleBigQueryConnectionFactory">
                <property name="credentialsFileLocation" value="MyDir/MyGcpKeyFile.json" />
            </bean>
        </property>
    </bean>

    <!--
    # ==========================================================================
    # Main Context Bean Definition
    # ==========================================================================
    -->
    <camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring" >

        <!--
        # ==================================================================
        # Message Route :
        # 1. consume messages from my AMQ queue
        # 2. set the InsertId / INSERT_ID (it is not clear which is the correct one)
        # 3. write message to Google BigQuery table
        # see https://github.com/apache/camel/blob/master/components/camel-google-bigquery/src/main/docs/google-bigquery-component.adoc
        # ==================================================================
        <log message="${headers} | ${body}" />
        -->
        <route>
            <from uri="jms:my.amq.queue.of.output.data.for.gcp?acknowledgementModeName=DUPS_OK_ACKNOWLEDGE&amp;concurrentConsumers=20" />
            <setHeader headerName="CamelGoogleBigQuery.InsertId">
                <simple>${header.KeyValuePreviouslyGenerated}</simple>
            </setHeader>
            <setHeader headerName="GoogleBigQueryConstants.INSERT_ID">
                <simple>${header.KeyValuePreviouslyGenerated}</simple>
            </setHeader>
            <to uri="gcp:my_gcp_project:my_bq_data_set:my_bq_table" />
        </route>

    </camelContext>

</beans>

为了获得更高的可用性,我们现在已将相同的实现部署到我们的备份站点,流式传输到相同的目标 BigQuery 表。正如预期的那样,相同的记录从两个站点流入同一个表,存在重复的记录。为了消除记录重复,我们尝试遵循此处给出的指导:

https://camel.apache.org/staging/components/latest/google-bigquery-component.html

消息 header 部分建议使用合适的运行时键值设置名为 CamelGoogleBigQuery.InsertId 的消息 header 。

但是,在同一页面下方的确保数据一致性部分,建议设置GoogleBigQueryConstants.INSERT_ID

我们已检查我们的主服务器和备份服务器是否在同一时区 (UTC) 中运行,并且我们正在生成我们认为合适的运行时唯一键:包含最接近的 UNIX 时间的字符串第二。

上面的代码示例显示我们已经尝试了这两种方法,但是对目标 BigQuery 表中的数据进行检查表明这两种方法似乎都不起作用,即我们仍然有重复的记录。

问题

  1. 上面代码中设置 InsertID/INSERT_ID 的方式是否有错误?
  2. 您是否使用 Camel Google BigQuery API 将数据流式传输到 BigQuery 中?
  3. 如果是,您是否成功使用了 InsertId/INSERT_ID 重复数据删除机制?如果是,是哪一个以及如何实现?
  4. 您观察到了哪些重复数据删除时间窗口?

最佳答案

GoogleBigQueryConstants.INSERT_ID 是值为 CamelGoogleBigQueryInsertId 的字符串常量。

像这样使用它:

<setHeader headerName="CamelGoogleBigQueryInsertId">
    <simple>${header.KeyValuePreviouslyGenerated}</simple>
</setHeader>

演示此行为的单元测试位于:InsertIdTest.java


有关这些 header 的文档有点过时,我已修复它,正确的版本可以在 google-bigquery-component.adoc 中找到。 。很快就会在网站上发布。

关于google-bigquery - 将数据流式传输到 Google BigQuery 表 : problems using InsertId to De-Duplicate Records,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57577379/

相关文章:

spring - 目标尝试恢复的 JMS 消息监听器调用程序的设置失败

sql - 如何在 BigQuery 中查看整个 session 路径?

arrays - 如何切片 BigQuery 数组 - 选择除最后一项以外的所有项目

java - 是否可以在 beanRef 的方法中检索 activeMQ 主题

java - 为什么 Camel noErrorHandler 使用堆栈跟踪记录警告消息?

java - 将 JMS 配置到 Spring 应用程序中意味着什么?

spring-boot - Spring Boot sleuth 与 jms 集成

php - PHP 中当前 Google API 的 BigQuery 示例

google-bigquery - 如何在 BigQuery 插入错误时崩溃/停止 DataFlow Pub/Sub 摄取

java - Camel : how to route data from process method to another pipeline?