我已经定义了一个 AVRO 架构,并使用该方案的 avro-tools 生成了一些类。现在,我想将数据序列化到磁盘。我找到了一些关于 scala 的答案,但没有找到关于 Java 的答案。 Article
类是使用 avro-tools 生成的,并且是根据我定义的模式制作的。
这是我尝试执行此操作的代码的简化版本:
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<Article> processingFiles = filesRDD.map(fileNameContent -> {
// The name of the file
String fileName = fileNameContent._1();
// The content of the file
String fileContent = fileNameContent._2();
// An object from my avro schema
Article a = new Article(fileContent);
Processing processing = new Processing();
// .... some processing of the content here ... //
processing.serializeArticleToDisk(avroFileName);
return a;
});
其中 serializeArticleToDisk(avroFileName)
定义如下:
public void serializeArticleToDisk(String filename) throws IOException{
// Serialize article to disk
DatumWriter<Article> articleDatumWriter = new SpecificDatumWriter<Article>(Article.class);
DataFileWriter<Article> dataFileWriter = new DataFileWriter<Article>(articleDatumWriter);
dataFileWriter.create(this.article.getSchema(), new File(filename));
dataFileWriter.append(this.article);
dataFileWriter.close();
}
其中 Article
是我的 avro 架构。
现在,映射器向我抛出错误:
java.io.FileNotFoundException: hdfs:/...path.../avroFileName.avro (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
at org.apache.avro.file.SyncableFileOutputStream.<init>(SyncableFileOutputStream.java:60)
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at sentences.ProcessXML.serializeArticleToDisk(ProcessXML.java:207)
. . . rest of the stacktrace ...
虽然文件路径是正确的。
之后我使用了 collect()
方法,因此 map
函数中的其他所有内容都工作正常(序列化部分除外)。
我对 Spark 还很陌生,所以我不确定这实际上是否是一件微不足道的事情。我怀疑我需要使用一些写入函数,而不是在映射器中进行写入(但不确定这是否属实)。有什么想法可以解决这个问题吗?
编辑:
我显示的错误堆栈跟踪的最后一行实际上位于这部分:
dataFileWriter.create(this.article.getSchema(), new File(文件名));
这是引发实际错误的部分。我假设 dataFileWriter
需要用其他东西替换。有什么想法吗?
最佳答案
此解决方案不使用数据帧,并且不会引发任何错误:
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.NullWritable;
import org.apache.avro.mapred.AvroKey;
import org.apache.spark.api.java.JavaPairRDD;
import scala.Tuple2;
. . . . .
// Serializing to AVRO
JavaPairRDD<AvroKey<Article>, NullWritable> javaPairRDD = processingFiles.mapToPair(r -> {
return new Tuple2<AvroKey<Article>, NullWritable>(new AvroKey<Article>(r), NullWritable.get());
});
Job job = AvroUtils.getJobOutputKeyAvroSchema(Article.getClassSchema());
javaPairRDD.saveAsNewAPIHadoopFile(outputDataPath, AvroKey.class, NullWritable.class, AvroKeyOutputFormat.class,
job.getConfiguration());
其中AvroUtils.getJobOutputKeyAvroSchema
是:
public static Job getJobOutputKeyAvroSchema(Schema avroSchema) {
Job job;
try {
job = new Job();
} catch (IOException e) {
throw new RuntimeException(e);
}
AvroJob.setOutputKeySchema(job, avroSchema);
return job;
}
Spark + Avro 的类似内容可以在这里找到 -> https://github.com/CeON/spark-utils .
关于java - 如何在 Spark 中将数据序列化为 AVRO 模式(使用 Java)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36546983/