我有以下 DoFN 函数可以做到这一点,但没有我可以找到的有关它的问题的文档。
- 问题 1 是如何自动转换 key ,以便在 BigQuery 中构造它们,就像 BigQuery 在导入表单数据存储区备份文件时所做的那样?
- 问题 2 是如何处理时间戳?下面的代码破坏了管道并显示以下消息:
JSON object specified for non-record field: timestamp
这是我编写的代码:
public class SensorObservationEntityToRowFn extends DoFn<Entity, TableRow> {
/**
* In this example, put the whole string into single BigQuery field.
*/
@Override
public void processElement(ProcessContext c) {
Map<String, Value> props = getPropertyMap(c.element());
TableRow row = new TableRow();
row.set("id", c.element().getKey().getPathElement(c.element().getKey().getPathElementCount()-1).getId());
if (
props.get("property1") != null &&
props.get("property2") != null
) {
// Map data from the source Entity to the destination TableRow
row.set("property1", props.get("property1").getStringValue());
row.set("property2", props.get("property2").getStringValue());
}
row.set("source_type", props.get("source_type").getStringValue());
DateTime dateTime = new DateTime(props.get("timestamp").getTimestampMicrosecondsValue()/1000L);
row.set("timestamp", dateTime);
// Output new TableRow only if all data is present in the source
c.output(row);
}
}
最佳答案
我的期望是在助手类中找到一些东西,但我没有成功。我猜 Google 仍在向他们的 API 添加新的部分。也许在下一个版本中。 最大的问题是API有点不直观并且与其他部分不一致。实体的键应该有它自己的访问器方法,而不是像这样挖掘祖先路径(获取路径数组的最后一个元素):
getKey().getPathElement(c.element().getKey().getPathElementCount()-1).getId()
时间戳的第二个问题:也有点不优雅。我在文档中找不到如何从 API 角度(数据类型、字段长度、格式等)在 Datastore 或 BigQuery 中格式化时间戳的信息。现在有效的解决方案需要第三方库(“joda”):
import org.joda.time.DateTime;
import org.joda.time.format.ISODateTimeFormat;
以及下面的数据翻译。您必须记住,在一个地方以毫秒为单位,在另一个地方以微秒为单位。又一个不必要的困惑。
DateTime dateTime = new DateTime(props.get("timestamp").getTimestampMicrosecondsValue()/1000L);
row.set("timestamp", ISODateTimeFormat.dateTime().print(dateTime));
希望这可以帮助其他人使用 Dataflow 并将数据从一个地方移动到另一个地方。
关于java - 如何在 Java 中将数据存储区实体转换为 BigQuery TableRow 对象?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39193150/