我有一个包含小 CSV 文件的文件夹,其中包含目录:“/catmydata/2017/1/1、/catdata/2017/1/2”,每个目录都包含一堆 CSV,其中 CSV 中的每一行都有 cat 适应。
我想使用spark分配负载并转换数据并将其粘贴到输出目录中的二进制JavaObject文件中,类似于我的源数据“/convertedcatdata/2017/1/1/adoptedcatjavaobjectbinaryfile,/convertedcatdata/2017/1/1/adoptedcatjavaobjectbinaryfile”
我将使用的 Java 对象是这样的:
public class Cat{
private Date dateOfAdoption;
private String name;
private Boolean isMale;
private Person owner;
public Cat(String name, Boolean isMale, Person owner, Date dateOfAdoption){
this.name = name;
this.isMale = isMale;
this.owner = owner;
this.dateOfAdoption = dateOfAdoption;
}
}
public class AdoptedCats{
public List<Cat> catList;
public AdoptedCats(){
}
}
我习惯使用 python Spark 将 csv 文件转换为 avro,但问题是在这种情况下我的输出文件是一个已填充的 Java 对象类。我怎样才能创建这样一个文件,其中它是一个序列化的 java 对象,其中包含 AdoptedClass 对象,该对象填充了 CSV 中列出的所有猫?
我的意思是,如果有人编写 Java 代码(非 Spark)并打开序列化的“adoptedcatjavaobjectbinaryfile”,他们就可以访问 Java 类的内容。说:
AdoptedCats adoptedCats = <the adoptedcatjavaobjectbinaryfile>
System.out.println(adoptedCats.cats.length) // this would output say, 75 if there were 75 rows in the original CSV.
我假设我必须使用Java Spark...但我不确定如何创建spark脚本,以便它可以划分并征服一月份整个月的所有cat目录,并将它们输出到各自的目录中。任何帮助或示例都会非常有帮助。
最佳答案
实现这一目标的步骤-
假设文件(我有一个包含目录的小 CSV 文件的文件夹)是
/somepath/catmydata.csv
JavaSparkContext context = ......; //Retrieve list of files from master file List<String> files = context.textFile("/somepath/catmydata.csv") .flatMap(row->Arrays.asList(row.split(",")).iterator()).collect(); //Iterate through each file and save it as object file Map<String,JavaRDD<Cat>> adoptedCatsMap =new HashMap<String,JavaRDD<Cat>>(); for (String file:files) { // Extract file path and attach to convert String output=String.format("/convertedcatdata/%s",file.substring(file.indexOf('/'),file.length())) JavaRDD<Cat> adoptedCatsRdd = context.textFile(file).map(s->new Cat(s)); adoptedCatsMap.put(output,adoptedCatsRdd); //This will write number of files based on rdd partitions, // if you want only one file in a directory then save it // to temporary directory then use copyMerge to create one file using Hadoop fs FileUtil // https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/fs/FileUtil.html //Save to physical location if required. adoptedCatsRdd.saveAsObjectFile(output); }
我希望这会有所帮助。
关于java - 如何使用Spark处理CSV数据并输出序列化的自定义JavaObject?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46535922/