我正在从 Kafka 接收 Avro 记录。我想将这些记录转换成 Parquet 文件。我正在关注这篇博文:http://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/
到目前为止的代码大致如下所示:
final String fileName
SinkRecord record,
final AvroData avroData
final Schema avroSchema = avroData.fromConnectSchema(record.valueSchema());
CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
int blockSize = 256 * 1024 * 1024;
int pageSize = 64 * 1024;
Path path = new Path(fileName);
writer = new AvroParquetWriter<>(path, avroSchema, compressionCodecName, blockSize, pageSize);
现在,这将执行 Avro 到 Parquet 的转换,但它会将 Parquet 文件写入磁盘。我想知道是否有更简单的方法将文件保存在内存中,这样我就不必管理磁盘上的临时文件。谢谢
最佳答案
请查看我的博客,https://yanbin.blog/convert-apache-avro-to-parquet-format-in-java/必要时翻译成英文
package yanbin.blog;
import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
public class InMemoryOutputFile implements OutputFile {
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
@Override
public PositionOutputStream create(long blockSizeHint) throws IOException { // Mode.CREATE calls this method
return new InMemoryPositionOutputStream(baos);
}
@Override
public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
return null;
}
@Override
public boolean supportsBlockSize() {
return false;
}
@Override
public long defaultBlockSize() {
return 0;
}
public byte[] toArray() {
return baos.toByteArray();
}
private static class InMemoryPositionOutputStream extends DelegatingPositionOutputStream {
public InMemoryPositionOutputStream(OutputStream outputStream) {
super(outputStream);
}
@Override
public long getPos() throws IOException {
return ((ByteArrayOutputStream) this.getStream()).size();
}
}
}
public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) throws IOException {
Schema avroSchema = avroObjects.get(0).getSchema();
GenericData genericData = GenericData.get();
genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
InMemoryOutputFile outputFile = new InMemoryOutputFile();
try (ParquetWriter<Object> writer = AvroParquetWriter.builder(outputFile)
.withDataModel(genericData)
.withSchema(avroSchema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(ParquetFileWriter.Mode.CREATE)
.build()) {
avroObjects.forEach(r -> {
try {
writer.write(r);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
});
} catch (IOException e) {
e.printStackTrace();
}
// dump memory data to file for testing
Files.write(Paths.get("./users-memory.parquet"), outputFile.toArray());
}
从内存中测试数据
$ parquet-tools cat --json users-memory.parquet
$ parquet-tools schema users-memory.parquet
关于java - Q : Converting Avro to Parquet in Memory,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39631812/