java - 云数据流:Reading entire json array file from Cloud Storage and create a PCollection of json object

标签 java google-cloud-dataflow

我有一个 json 数组文件,内容如下

[ {  
  "MemberId" : "1234",  
  "Date" : "2017-07-03",  
  "Interactions" : [ {  
    "Number" : "1327",  
    "DwellTime" : "00:03:05"    
  } ]  
}, {  
  "MemberId" : "5678",  
  "Date" : "2017-07-03",  
  "Interactions" : [ {  
    "Number" : "1172",  
    "DwellTime" : "00:01:26"  
  } ]  
} ] 

我想创建一个映射到 Json 数组中存在的每个 Json 的 Java 对象的 PCollection

最佳答案

像这样的 JSON 格式(记录分布在多行而不是每行一个)对于像 Beam/Dataflow 这样的数据处理工具来说很难并行处理 - 从文件中的随机点开始,你无法确定下一个点在哪里记录开始。您可以通过从文件开头读取来做到这一点,但是这样您就没有真正并行读取。

如果可能的话,重新格式化它,使其每行一条记录,这样您就可以使用 TextIO 之类的东西来读取文件。

如果没有,您需要一次性读取该文件。

我建议几种可能的方法:

编写一个使用 gcs API 从文件中读取的 ParDo

这非常简单。您将在一个 ParDo 中完成所有读取,并且需要在该 ParDo 内实现连接代码。在 pardo 内部,您将编写与在普通 java 程序中读取文件相同的代码。 pardo 会将每个 java 对象作为记录发出。

实现基于文件的源

基于文件的源将起作用 - 当 fileOrPatternSpec 为“gs://...”时,它知道如何从 GCS 读取。您需要确保将 fileMetadata.isReadSeekEfficient 设置为 false,这样它就不会尝试拆分文件。我还没有尝试过,但我相信正确的方法是将其设置在 FBS 的单个文件构造函数内(即,您的类重写 FileBaseSource(MetaData, long, long)

TextSource/XmlSource(及其随附的包装器 TextIO/XmlIO)就是这样的示例,只不过它们尝试实现拆分 - 您的会简单得多,因为它不会。

关于java - 云数据流:Reading entire json array file from Cloud Storage and create a PCollection of json object,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44934866/

相关文章:

java - 如何使用java + selenium设置chrome浏览器语言?

google-cloud-dataflow - 在多个管道上创建具有多个输入源的单个管道(每个管道都定义了单独的输入源)的意义是什么?

google-cloud-dataflow - 如何以有效的方式计算数据流中文件的行数?

python - 如何组合两个结果并将其通过管道传输到 apache-beam 管道中的下一步

google-cloud-platform - 基于 TaggedOutput 数量动态 fork Beam(数据流)管道

java - JDBC 和 SQLException

java - 使用 MediaPlayer 获取原始资源的持续时间长度

google-cloud-dataflow - GroupByKey 转换的早期结果

java - endAllStagingAnimators 说明

java - 想在jvm上实现dart-lang应该学什么?