java - 如何使用Spark处理CSV数据并输出序列化的自定义JavaObject?

标签 java apache-spark

我有一个包含小 CSV 文件的文件夹,其中包含目录:“/catmydata/2017/1/1、/catdata/2017/1/2”,每个目录都包含一堆 CSV,其中 CSV 中的每一行都有 cat 适应。

我想使用spark分配负载并转换数据并将其粘贴到输出目录中的二进制JavaObject文件中,类似于我的源数据“/convertedcatdata/2017/1/1/adoptedcatjavaobjectbinaryfile,/convertedcatdata/2017/1/1/adoptedcatjavaobjectbinaryfile”

我将使用的 Java 对象是这样的:

public class Cat{
    private Date dateOfAdoption;
    private String name;
    private Boolean isMale;
    private Person owner;

    public Cat(String name, Boolean isMale, Person owner, Date dateOfAdoption){
       this.name = name;
       this.isMale = isMale;
       this.owner = owner;
       this.dateOfAdoption = dateOfAdoption;
    }
}

public class AdoptedCats{
    public List<Cat> catList;

    public AdoptedCats(){
    }
}

我习惯使用 python Spark 将 csv 文件转换为 avro,但问题是在这种情况下我的输出文件是一个已填充的 Java 对象类。我怎样才能创建这样一个文件,其中它是一个序列化的 java 对象,其中包含 AdoptedClass 对象,该对象填充了 CSV 中列出的所有猫?

我的意思是,如果有人编写 Java 代码(非 Spark)并打开序列化的“adoptedcatjavaobjectbinaryfile”,他们就可以访问 Java 类的内容。说:

AdoptedCats adoptedCats = <the adoptedcatjavaobjectbinaryfile>
System.out.println(adoptedCats.cats.length) // this would output say, 75 if there were 75 rows in the original CSV.

我假设我必须使用Java Spark...但我不确定如何创建spark脚本,以便它可以划分并征服一月份整个月的所有cat目录,并将它们输出到各自的目录中。任何帮助或示例都会非常有帮助。

最佳答案

实现这一目标的步骤-

  1. 假设文件(我有一个包含目录的小 CSV 文件的文件夹)是 /somepath/catmydata.csv

        JavaSparkContext context  = ......;
    
        //Retrieve list of files from master file
    
        List<String> files = context.textFile("/somepath/catmydata.csv")
                .flatMap(row->Arrays.asList(row.split(",")).iterator()).collect();
    
       //Iterate through each file and save it as object file
       Map<String,JavaRDD<Cat>> adoptedCatsMap =new HashMap<String,JavaRDD<Cat>>();
        for (String file:files) {
    
           // Extract file path and attach to convert
            String output=String.format("/convertedcatdata/%s",file.substring(file.indexOf('/'),file.length()))
    
           JavaRDD<Cat> adoptedCatsRdd = context.textFile(file).map(s->new Cat(s));
    
           adoptedCatsMap.put(output,adoptedCatsRdd);
           //This will write number of files based on rdd partitions,
           // if you want only one file in a directory then save it 
           // to temporary directory then use copyMerge to create one file using Hadoop fs FileUtil
           // https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/fs/FileUtil.html
          //Save to physical location if required.
           adoptedCatsRdd.saveAsObjectFile(output);
        }
    

我希望这会有所帮助。

关于java - 如何使用Spark处理CSV数据并输出序列化的自定义JavaObject?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46535922/

相关文章:

java - 在Android Kotlin上使用 Canvas mask 输入的文本

scala - ElasticClient提供NullPointerException

jdbc - 按关键 "within"分区进行高效分组

elasticsearch - 如何通过Spark Streaming实时更新Elasticsearch文档?

scala - UDAF Spark 中的多列输出

java - 使用可执行 Jar 时将 phantomjs 作为本地资源启动

java - 正则表达式中的选项模式定界符

java - java中同步块(synchronized block)的部分执行

java - SPARK:java.lang.IllegalStateException:找不到任何构建目录

java - 当另一个组件更新时,自定义 JPanel 中出现奇怪的点和线