java - 如何在 Java 中将数据存储区实体转换为 BigQuery TableRow 对象?

标签 java google-app-engine google-cloud-dataflow

我有以下 DoFN 函数可以做到这一点,但没有我可以找到的有关它的问题的文档。

  • 问题 1 是如何自动转换 key ,以便在 BigQuery 中构造它们,就像 BigQuery 在导入表单数据存储区备份文件时所做的那样?
  • 问题 2 是如何处理时间戳?下面的代码破坏了管道并显示以下消息:

JSON object specified for non-record field: timestamp

这是我编写的代码:

public class SensorObservationEntityToRowFn extends DoFn<Entity, TableRow> {
    /**
     * In this example, put the whole string into single BigQuery field.
     */
    @Override
    public void processElement(ProcessContext c) {
        Map<String, Value> props = getPropertyMap(c.element());
        TableRow row = new TableRow();
        row.set("id", c.element().getKey().getPathElement(c.element().getKey().getPathElementCount()-1).getId());
        if (
                props.get("property1") != null &&
                props.get("property2") != null
                ) {
            // Map data from the source Entity to the destination TableRow
            row.set("property1", props.get("property1").getStringValue());
            row.set("property2", props.get("property2").getStringValue());
        }
        row.set("source_type", props.get("source_type").getStringValue());
        DateTime dateTime = new DateTime(props.get("timestamp").getTimestampMicrosecondsValue()/1000L);
        row.set("timestamp", dateTime);
        // Output new TableRow only if all data is present in the source
        c.output(row);
    }
}

最佳答案

我的期望是在助手类中找到一些东西,但我没有成功。我猜 Google 仍在向他们的 API 添加新的部分。也许在下一个版本中。 最大的问题是API有点不直观并且与其他部分不一致。实体的键应该有它自己的访问器方法,而不是像这样挖掘祖先路径(获取路径数组的最后一个元素):

getKey().getPathElement(c.element().getKey().getPathElementCount()-1).getId()

时间戳的第二个问题:也有点不优雅。我在文档中找不到如何从 API 角度(数据类型、字段长度、格式等)在 Datastore 或 BigQuery 中格式化时间戳的信息。现在有效的解决方案需要第三方库(“joda”):

import org.joda.time.DateTime;
import org.joda.time.format.ISODateTimeFormat;

以及下面的数据翻译。您必须记住,在一个地方以毫秒为单位,在另一个地方以微秒为单位。又一个不必要的困惑。

DateTime dateTime = new DateTime(props.get("timestamp").getTimestampMicrosecondsValu‌​e()/1000L);

row.set("timestamp", ISODateTimeFormat.dateTime().print(dateTime));

希望这可以帮助其他人使用 Dataflow 并将数据从一个地方移动到另一个地方。

关于java - 如何在 Java 中将数据存储区实体转换为 BigQuery TableRow 对象?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39193150/

相关文章:

java - API 调用返回 Null

java - JMS 和 AMQP - RabbitMQ

python - 谷歌应用引擎 webapp2 : No module named pkg_resources error when importing bigquery

google-app-engine - 如何在 Google Cloud Platform 上安排 Cron 作业?

JAVA-Apache BEAM-GCP : GroupByKey works fine with Direct Runner but fails with Dataflow runner

python - 数据流作业中出现 ModuleNotFoundError

java - 编译错误 : duplicate class with MapStruct in IntelliJ IDEA 2019. 1

java - java.lang.OutOfMemoryError : Java heap space , Java Swing 应用程序的工具

google-app-engine - 在 Google App Engine mapreduce 的 python 版本中,如何从 done_callback 访问计数器?

java - 将 STRUCT 数组从 Dataflow 写入大查询