我有一个 json 数组文件,内容如下
[ {
"MemberId" : "1234",
"Date" : "2017-07-03",
"Interactions" : [ {
"Number" : "1327",
"DwellTime" : "00:03:05"
} ]
}, {
"MemberId" : "5678",
"Date" : "2017-07-03",
"Interactions" : [ {
"Number" : "1172",
"DwellTime" : "00:01:26"
} ]
} ]
我想创建一个映射到 Json 数组中存在的每个 Json 的 Java 对象的 PCollection
最佳答案
像这样的 JSON 格式(记录分布在多行而不是每行一个)对于像 Beam/Dataflow 这样的数据处理工具来说很难并行处理 - 从文件中的随机点开始,你无法确定下一个点在哪里记录开始。您可以通过从文件开头读取来做到这一点,但是这样您就没有真正并行读取。
如果可能的话,重新格式化它,使其每行一条记录,这样您就可以使用 TextIO 之类的东西来读取文件。
如果没有,您需要一次性读取该文件。
我建议几种可能的方法:
编写一个使用 gcs API 从文件中读取的 ParDo
这非常简单。您将在一个 ParDo 中完成所有读取,并且需要在该 ParDo 内实现连接代码。在 pardo 内部,您将编写与在普通 java 程序中读取文件相同的代码。 pardo 会将每个 java 对象作为记录发出。
实现基于文件的源
基于文件的源将起作用 - 当 fileOrPatternSpec 为“gs://...”时,它知道如何从 GCS 读取。您需要确保将 fileMetadata.isReadSeekEfficient 设置为 false,这样它就不会尝试拆分文件。我还没有尝试过,但我相信正确的方法是将其设置在 FBS 的单个文件构造函数内(即,您的类重写 FileBaseSource(MetaData, long, long)
TextSource/XmlSource(及其随附的包装器 TextIO/XmlIO)就是这样的示例,只不过它们尝试实现拆分 - 您的会简单得多,因为它不会。
关于java - 云数据流:Reading entire json array file from Cloud Storage and create a PCollection of json object,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44934866/