java - 如何在没有 spark 或框架的情况下将 parquet 文件保存在 hdfs 中?

标签 java hadoop hdfs parquet

我想使用 java 将 parquet 文件直接保存到 hdfs。

这是我用来生成 parquet 文件并将它们存储在本地的代码,但现在我想将它们存储在 hdfs 中。

 final String schemaLocation = "/home/javier/FlinkProjects/kafka-flink/src/main/java/com/grallandco/demos/avro.json";
  final Schema avroSchema = new Schema.Parser().parse(new File(schemaLocation));
  final MessageType parquetSchema = new AvroSchemaConverter().convert(avroSchema);
  final WriteSupport writeSupport = new AvroWriteSupport(parquetSchema, avroSchema);
  final String parquetFile = "/home/javier/parquet-files/data" + postfijoFilename + ".parquet";
  final Path path = new Path(parquetFile);
  AvroParquetWriter parquetWriter = new AvroParquetWriter(path,
          avroSchema, CompressionCodecName.SNAPPY, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE);
  final GenericRecord record = new GenericData.Record(avroSchema);
  record.put(Constantes.CAMPO_ID, datos[0]);
  record.put("movie", datos[1]);
  record.put("date", datos[2]);
  record.put("imdb", datos[3]);
  parquetWriter.write(record);

我想替换这个

 final String parquetFile = "/home/javier/parquet-files/data" + postfijoFilename + ".parquet";

有 hadoop hdfs 路径,知道吗???

最佳答案

您可以通过以下方式执行此操作(请注意,该位置必须存在并在代码中更改您的 hdfsurl 和用户名。可能需要在 hdfs 中包含架构):

final String schemaLocation = "/home/javier/FlinkProjects/kafka-flink/src/main/java/com/grallandco/demos/avro.json";
final Schema avroSchema = new Schema.Parser().parse(new File(schemaLocation));
final MessageType parquetSchema = new AvroSchemaConverter().convert(avroSchema);
final WriteSupport writeSupport = new AvroWriteSupport(parquetSchema,  avroSchema);
final Path path = new Path("/user/hduser/parquet-files/data" + 
       postfijoFilename + ".parquet");
Configuration configuration = new Configuration();
String hdfsUrl = "hdfs://hadoopnamenode:9000/";
String username = "hduser";
FileSystem fs= FileSystem.get(new URI(hdfsUrl), configuration);
UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(username);
ugi.doAs(new PrivilegedExceptionAction<Void>() {
            public Void run() throws Exception { 
                AvroParquetWriter parquetWriter = new 
                    AvroParquetWriter(path,
                    avroSchema, 
                    CompressionCodecName.SNAPPY,                
                    ParquetWriter.DEFAULT_BLOCK_SIZE, 
                    ParquetWriter.DEFAULT_PAGE_SIZE);
                final GenericRecord record = new 
                    GenericData.Record(avroSchema);
                record.put(Constantes.CAMPO_ID, datos[0]);
                record.put("movie", datos[1]);
                record.put("date", datos[2]);
                record.put("imdb", datos[3]);
                parquetWriter.write(record);
                return null;
                }
    });

关于java - 如何在没有 spark 或框架的情况下将 parquet 文件保存在 hdfs 中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49160895/

相关文章:

java - java中如何计算当日结束时的库存量

java - 通过 "new"创建一个 String 对象

java - 在java中将字符串解析为日期

java - Flume:来自下游的意外异常。 java.io.IOException:对等重置连接

hadoop - 无法将数据从 HDFS 加载到 Hadoop 中的 Hive

java - 我的类没有重写抽象方法compareTo

sql - 如何使用 Select 语句在 Hive 中执行除法

amazon-web-services - 从 s3a 读取文件以及 AWS Athena SDK (1.11+)

hadoop - 如何解决错误 "file:/user/hive/warehouse/records is not a directory or unable to create one"?

hadoop - Impala 扫描 MapR-FS 慢