java - Q : Converting Avro to Parquet in Memory

标签 java hadoop avro parquet

我正在从 Kafka 接收 Avro 记录。我想将这些记录转换成 Parquet 文件。我正在关注这篇博文:http://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/

到目前为止的代码大致如下所示:

final String fileName
SinkRecord record, 
final AvroData avroData

final Schema avroSchema = avroData.fromConnectSchema(record.valueSchema());
CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;

int blockSize = 256 * 1024 * 1024;
int pageSize = 64 * 1024;

Path path = new Path(fileName);
writer = new AvroParquetWriter<>(path, avroSchema, compressionCodecName, blockSize, pageSize);

现在,这将执行 Avro 到 Parquet 的转换,但它会将 Parquet 文件写入磁盘。我想知道是否有更简单的方法将文件保存在内存中,这样我就不必管理磁盘上的临时文件。谢谢

最佳答案

请查看我的博客,https://yanbin.blog/convert-apache-avro-to-parquet-format-in-java/必要时翻译成英文

package yanbin.blog;
 
import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
 
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
 
public class InMemoryOutputFile implements OutputFile {
    private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
    @Override
    public PositionOutputStream create(long blockSizeHint) throws IOException { // Mode.CREATE calls this method
        return new InMemoryPositionOutputStream(baos);
    }
 
    @Override
    public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
        return null;
    }
 
    @Override
    public boolean supportsBlockSize() {
        return false;
    }
 
    @Override
    public long defaultBlockSize() {
        return 0;
    }
 
    public byte[] toArray() {
        return baos.toByteArray();
    }
 
    private static class InMemoryPositionOutputStream extends DelegatingPositionOutputStream {
 
        public InMemoryPositionOutputStream(OutputStream outputStream) {
            super(outputStream);
        }
 
        @Override
        public long getPos() throws IOException {
            return ((ByteArrayOutputStream) this.getStream()).size();
        }
    }
}
    public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) throws IOException {
        Schema avroSchema = avroObjects.get(0).getSchema();
        GenericData genericData = GenericData.get();
        genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
        InMemoryOutputFile outputFile = new InMemoryOutputFile();
        try (ParquetWriter<Object> writer = AvroParquetWriter.builder(outputFile)
                .withDataModel(genericData)
                .withSchema(avroSchema)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .withWriteMode(ParquetFileWriter.Mode.CREATE)
                .build()) {
            avroObjects.forEach(r -> {
                try {
                    writer.write(r);
                } catch (IOException ex) {
                    throw new UncheckedIOException(ex);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
 
        // dump memory data to file for testing
        Files.write(Paths.get("./users-memory.parquet"), outputFile.toArray());
    }

从内存中测试数据

$ parquet-tools cat --json users-memory.parquet
$ parquet-tools schema users-memory.parquet

关于java - Q : Converting Avro to Parquet in Memory,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39631812/

相关文章:

python-3.x - 如何从远程python服务连接到CDH集群

java - Avro 序列化 : which parts are and aren't thread-safe?

json - 创建方案 .avsc Avro 的问题

scala - 使用Flink中的RollingSink将用Avro序列化的对象写入HDFS [Scala]

java - 是否可以使用 JAVA 从文件中读取/写入位?

java - Hadoop 0.21.0 中打开文件描述符的预期消耗

java - 验证/输入 GPS 坐标

hadoop - 需要优化配置单元查询

Java:带有管道特殊字符的 split() 方法

java - 在 Mapreduce 中设置 job.setInputFormatClass 时出错