java - 如何在 Spark 中将数据序列化为 AVRO 模式(使用 Java)?

标签 java apache-spark hdfs avro spark-avro

我已经定义了一个 AVRO 架构,并使用该方案的 avro-tools 生成了一些类。现在,我想将数据序列化到磁盘。我找到了一些关于 scala 的答案,但没有找到关于 Java 的答案。 Article 类是使用 avro-tools 生成的,并且是根据我定义的模式制作的。

这是我尝试执行此操作的代码的简化版本:

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<Article> processingFiles = filesRDD.map(fileNameContent -> {
    // The name of the file
    String fileName = fileNameContent._1();
    // The content of the file
    String fileContent = fileNameContent._2();

    // An object from my avro schema
    Article a = new Article(fileContent);

    Processing processing = new Processing();
    // .... some processing of the content here ... //

    processing.serializeArticleToDisk(avroFileName);

    return a;
});

其中 serializeArticleToDisk(avroFileName) 定义如下:

public void serializeArticleToDisk(String filename) throws IOException{
    // Serialize article to disk
    DatumWriter<Article> articleDatumWriter = new SpecificDatumWriter<Article>(Article.class);
    DataFileWriter<Article> dataFileWriter = new DataFileWriter<Article>(articleDatumWriter);
    dataFileWriter.create(this.article.getSchema(), new File(filename));
    dataFileWriter.append(this.article);
    dataFileWriter.close();
}

其中 Article 是我的 avro 架构。

现在,映射器向我抛出错误:

java.io.FileNotFoundException: hdfs:/...path.../avroFileName.avro (No such file or directory)   
at java.io.FileOutputStream.open0(Native Method)    
at java.io.FileOutputStream.open(FileOutputStream.java:270)     
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)   
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)   
at org.apache.avro.file.SyncableFileOutputStream.<init>(SyncableFileOutputStream.java:60)   
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at sentences.ProcessXML.serializeArticleToDisk(ProcessXML.java:207)     
. . . rest of the stacktrace ... 

虽然文件路径是正确的。

之后我使用了 collect() 方法,因此 map 函数中的其他所有内容都工作正常(序列化部分除外)。

我对 Spark 还很陌生,所以我不确定这实际上是否是一件微不足道的事情。我怀疑我需要使用一些写入函数,而不是在映射器中进行写入(但不确定这是否属实)。有什么想法可以解决这个问题吗?

编辑:

我显示的错误堆栈跟踪的最后一行实际上位于这部分:

dataFileWriter.create(this.article.getSchema(), new File(文件名));

这是引发实际错误的部分。我假设 dataFileWriter 需要用其他东西替换。有什么想法吗?

最佳答案

此解决方案不使用数据帧,并且不会引发任何错误:

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.NullWritable;
import org.apache.avro.mapred.AvroKey;
import org.apache.spark.api.java.JavaPairRDD;
import scala.Tuple2;

   .  .  .  .  .

// Serializing to AVRO
JavaPairRDD<AvroKey<Article>, NullWritable> javaPairRDD = processingFiles.mapToPair(r -> {    
    return new Tuple2<AvroKey<Article>, NullWritable>(new AvroKey<Article>(r), NullWritable.get());
});
Job job = AvroUtils.getJobOutputKeyAvroSchema(Article.getClassSchema());
javaPairRDD.saveAsNewAPIHadoopFile(outputDataPath, AvroKey.class, NullWritable.class, AvroKeyOutputFormat.class, 
        job.getConfiguration());

其中AvroUtils.getJobOutputKeyAvroSchema是:

public static Job getJobOutputKeyAvroSchema(Schema avroSchema) {
    Job job;

    try {
        job = new Job();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }

    AvroJob.setOutputKeySchema(job, avroSchema);
    return job;
}

Spark + Avro 的类似内容可以在这里找到 -> https://github.com/CeON/spark-utils .

关于java - 如何在 Spark 中将数据序列化为 AVRO 模式(使用 Java)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36546983/

相关文章:

java - 如何在具有接口(interface)返回类型的方法中实现错误处理

scala - spark scala 数据帧时间戳转换排序?

java - 如何从 MapReduce 中引用本地 HDFS 文件?

hadoop - 启动备用HDFS HA名称节点的SIGTERM

java - 如何将自定义 View 添加到布局?

java - 如何配置连接到 AWS EMR spark 集群的 Java 客户端

apache-spark - 重新启动 Spark 后如何在 Web UI 中查看 'finished App' 详细信息

python - Spark DataFrame to Dict - 字典更新序列元素错误

hadoop - Hadoop previous.checkpoint位置

java - System.out.println() 在 for 循环中无法正常工作