java - Parquet Writer 到缓冲区或字节流

标签 java bufferedreader parquet

我有一个将 json 消息转换为 parquet 格式的 java 应用程序。是否有任何 Parquet 编写器可以写入 java 中的缓冲区或字节流?我见过的大多数示例都写入文件。

最佳答案

TLDR;您将需要实现 OutputFile,例如类似的东西:

import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;

import java.io.BufferedOutputStream;
import java.io.IOException;

public class ParquetBufferedWriter implements OutputFile {

    private final BufferedOutputStream out;

    public ParquetBufferedWriter(BufferedOutputStream out) {
        this.out = out;
    }

    @Override
    public PositionOutputStream create(long blockSizeHint) throws IOException {
        return createPositionOutputstream();
    }

    private PositionOutputStream createPositionOutputstream() {
        return new PositionOutputStream() {
            @Override
            public long getPos() throws IOException {
                return 0;
            }

            @Override
            public void write(int b) throws IOException {
                out.write(b);
            }
        };
    }

    @Override
    public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
        return createPositionOutputstream();
    }

    @Override
    public boolean supportsBlockSize() {
        return false;
    }

    @Override
    public long defaultBlockSize() {
        return 0;
    }

}

你的作家会是这样的:

    ParquetBufferedWriter out = new ParquetBufferedWriter();
        try (ParquetWriter<Record> writer = AvroParquetWriter.
                <Record>builder(out)
                .withRowGroupSize(DEFAULT_BLOCK_SIZE)
                .withPageSize(DEFAULT_PAGE_SIZE)
                .withSchema(SCHEMA)
                .build()) {

            for (Record record : records) {
                writer.write(record);
            }
        } catch (IOException e) {
            throw new IllegalStateException(e);
        }

关于java - Parquet Writer 到缓冲区或字节流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40089689/

相关文章:

java - Azure VMSS 中 guest 操作系统指标和主机操作系统指标之间的差异

java - 动态 Web 模块方面的版本与 servlet 版本相同吗?

java - 在 Web 应用程序中的何处添加安全提供程序?

java - 使用 Buffered Reader 将用户输入添加到数组中

java - 如何获取 ISO-8859-15 中的 InputStreamReader?

apache-spark - Spark 与 Avro、Kryo 和 Parquet

java - 在矩阵中查找其元素总和为给定数字的路径

Python写二进制文件,字节

Tensorflow 数据集 API : input pipeline with parquet files

apache-kafka - 如何在达到特定大小 (128 Mb) 时将 Kafka 消息提交到 HDFS 接收器