java - 在 Apache Beam 中读取 CSV 文件时跳过 header

标签 java google-cloud-platform google-cloud-dataflow apache-beam

我想跳过 CSV 文件的标题行。截至目前,我在将 header 加载到谷歌存储之前手动删除 header 。

下面是我的代码:

PCollection<String> financeobj =p.apply(TextIO.read().from("gs://storage_path/Financials.csv"));        
    PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {
            String[] strArr = c.element().split(",");
            ClassFinance fin = new ClassFinance();
            fin.setBeneficiaryFinance(strArr[0]);
            fin.setCatlibCode(strArr[1]);
            fin.set_rNR_(Double.valueOf(strArr[2]));
            fin.set_rNCS_(Double.valueOf(strArr[3]));
            fin.set_rCtb_(Double.valueOf(strArr[4]));
            fin.set_rAC_(Double.valueOf(strArr[5]));
            c.output(fin);
        }
    }));

我已经检查了 stackoverflow 中的现有问题,但我觉得它没有希望:Skipping header rows - is it possible with Cloud DataFlow?

有什么帮助吗?

编辑:我已经尝试过类似下面的方法并且有效:

PCollection<String> financeobj = p.apply(TextIO.read().from("gs://google-bucket/final_input/Financials123.csv"));       

    PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {  
            String[] strArr2 = c.element().split(",");
            String header = Arrays.toString(strArr2);
            ClassFinance fin = new ClassFinance();

                if(header.contains("Beneficiary"))
                System.out.println("Header");
                else {
            fin.setBeneficiaryFinance(strArr2[0].trim());
            fin.setCatlibCode(strArr2[1].trim());
            fin.setrNR(Double.valueOf(strArr2[2].trim().replace("", "0")));
            fin.setrNCS(Double.valueOf(strArr2[3].trim().replace("", "0")));
            fin.setrCtb(Double.valueOf(strArr2[4].trim().replace("", "0")));
            fin.setrAC(Double.valueOf(strArr2[5].trim().replace("", "0")));
            c.output(fin);
            }
        }
    }));

最佳答案

您分享的较旧的 Stack Overflow 帖子 ( Skipping header rows - is it possible with Cloud DataFlow? ) 确实包含您问题的答案。

此选项在 Apache Beam SDK 中当前不可用,尽管 Apache Beam JIRA 问题跟踪器中有一个开放的功能请求BEAM-123 .请注意,在撰写本文时,此功能请求仍处于打开状态且未得到解决,而且这种情况已经持续了 2 年。然而,从这个意义上说,似乎正在做一些努力,并且该问题的最新更新是从 2018 年 2 月开始的,所以我建议您随时了解该 JIRA 问题的最新情况,因为它最近已移至 sdk -java-core 组件,它可能会在那里受到更多关注。

考虑到这些信息,我会说您使用的方法(在将文件上传到 GCS 之前删除 header )是您的最佳选择。我会避免手动执行此操作,因为您可以轻松编写脚本并自动执行删除 header 上传文件 过程。


编辑:

我已经能够使用 DoFn 想出一个简单的过滤器。它可能不是最优雅的解决方案(我自己不是 Apache Beam 专家),但它确实有效,您可以根据自己的需要对其进行调整。它要求您事先知道正在上传的 CSV 文件的 header (因为它将按元素内容过滤),但同样,将其作为您可以根据需要修改的模板:

public class RemoveCSVHeader {
  // The Filter class
  static class FilterCSVHeaderFn extends DoFn<String, String> {
    String headerFilter;

    public FilterCSVHeaderFn(String headerFilter) {
      this.headerFilter = headerFilter;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
      String row = c.element();
      // Filter out elements that match the header
      if (!row.equals(this.headerFilter)) {
        c.output(row);
      }
    }
  }

  // The main class
  public static void main(String[] args) throws IOException {
    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline p = Pipeline.create(options);

    PCollection<String> vals = p.apply(TextIO.read().from("gs://BUCKET/FILE.csv"));

    String header = "col1,col2,col3,col4";

    vals.apply(ParDo.of(new FilterCSVHeaderFn(header)))
        .apply(TextIO.write().to("out"));

    p.run().waitUntilFinish();
  }
}

关于java - 在 Apache Beam 中读取 CSV 文件时跳过 header ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50855647/

相关文章:

java - JTable 在 View 中显示和更新行号

java - Spring Boot + Spring Data Multi-Tenancy

java - 递归进入executePendingTransactions,即使使用getChildFragmentManager

java - 使用 TomEE 和 IntelliJ IDEA 的 RESTful 服务

java - 谷歌对话流 : The Application Default Credentials are not available

google-cloud-dataflow - 数据流在 g1-small 实例上失败 java.io.IOException

google-app-engine - 每次使用 App Engine 中的数据存储都会超时

go - 将行写入 Cloud Bigtable 时出现客户端错误

python - 没有可用的用户状态上下文谷歌云数据流?

scala - 使用 Dataflow 将 PubSub 流写入 Cloud Storage 时出错