java - 如何使用纯Java(包括日期和小数类型)生成Parquet文件并将其上传到S3 [Windows](无HDFS)

标签 java apache-spark amazon-s3 avro parquet

最近,我有一个要求,我需要生成Parquet文件,该文件只能由Apache Spark仅使用Java读取(不使用其他软件安装,例如:Apache Drill,Hive,Spark等)。这些文件需要保存到S3,因此我将共享有关如何同时执行这两个操作的详细信息。

没有简单的遵循指南来执行此操作。我也不是Java程序员,所以使用Maven,Hadoop等的概念对我来说是陌生的。因此,我花了将近两个星期的时间才能完成这项工作。我想在下面分享我的个人指导,以了解如何实现这一目标

最佳答案

免责声明:以下代码示例绝不代表最佳实践,仅作为粗略的方法介绍。

依赖项:

  • parquet-avro(1.9.0):https://mvnrepository.com/artifact/org.apache.parquet/parquet-avro/1.9.0(我们使用1.9.0,因为此版本使用的Avro 1.8+支持小数和日期)
  • hadoop-aws(2.8.2)[如果您不打算编写S3,则不需要此功能,但您需要添加通常由于此而添加的其他一些依赖项。我不会介绍这种情况。因此,即使仅在本地磁盘上生成Parquet文件,您仍然可以将其作为依赖项添加到您的项目中]:https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/2.8.2(我们使用它是因为当时它是最新版本)
  • Hadoop 2.8.1:https://github.com/steveloughran/winutils/tree/master/hadoop-2.8.1(我们使用2.8.X,因为它需要匹配parquet-avro和hadoop-aws依赖项中使用的hadoop库)

    我将使用 NetBeans 作为我的IDE。

    有关Java中 Parquet 的一些信息(适用于像我这样的菜鸟):
  • 为了将数据序列化为Parquet,您必须选择一种流行的Java数据序列化框架:Avro,Protocol Buffers或Thrift(我将使用Avro(1.8.0),如我们的parquet-avro所示)依赖)
  • 您将需要使用支持Maven的IDE。这是因为上面的依赖项本身具有很多依赖项。 Maven将自动为您下载这些文件(例如NuGet for VisualStudio)

  • 前提条件:

    您必须在将运行Java代码的Windows机器上安装hadoop。好消息是您不需要安装整个hadoop软件,而仅需要两个文件:
  • hadoop.dll
  • winutils.exe

  • 这些可以下载到here。对于此示例,您将需要版本2.8.1(由于parquet-avro 1.9.0)。
  • 将这些文件复制到目标计算机上的 C:\hadoop-2.8.1\bin
  • 添加一个名为 HADOOP_HOME 的新系统变量(非用户变量),值 C:\hadoop-2.8.1

    systemvariable
  • 修改系统路径变量(不是用户变量),并将以下内容添加到末尾:%HADOOP_HOME%\bin
  • 重新启动计算机以使更改生效。

  • 如果未正确完成此配置,则在运行时会出现以下错误:java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    编码入门:
  • 首先创建一个新的空Maven项目,并添加parquet-avro 1.9.0和hadoop-aws 2.8.2作为依赖项:dependency
  • 创建您的主类,您可以在其中编写一些代码
  • 首先,您需要生成一个模式。现在,据我所知,您无法在运行时以编程方式生成模式。 Schema.Parser 类的parse()方法仅将文件或字符串文字作为参数,并且不允许您在创建架构后对其进行修改。
    为了避免这种情况,我在运行时生成了Schema JSON并对其进行了解析。以下是示例架构:
    String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything
        + "\"type\": \"record\"," //Must be set as record
        + "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything
        + "\"fields\": ["
        + " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field
        + " {\"name\": \"myString\",  \"type\": [\"string\", \"null\"]},"
        + " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]},"
        + " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}"
        + " ]}";
    Parser parser = new Schema.Parser().setValidate(true);
    Schema avroSchema = parser.parse(schema);
    

    有关Avro模式的详细信息可以在这里找到:https://avro.apache.org/docs/1.8.0/spec.html
  • 接下来,我们可以开始生成记录(Avro基本类型很简单):
    GenericData.Record record = new GenericData.Record(avroSchema);
    record.put("myInteger", 1);
    record.put("myString", "string value 1");
    
  • 为了生成十进制逻辑类型,必须将固定的字节基本类型用作存储的实际数据类型。当前的Parquet格式仅支持固定长度的字节数组(也称为fixed_len_byte_array)。因此,在我们的情况下,我们也必须使用固定的(在模式中可以看到)。在Java中,我们必须使用BigDecimal才能真正处理小数。而且我已经确定,不管值如何,Decimal(32,4)所占用的字节数都不会超过16个字节。因此,在下面的序列化中(以及上面的模式中),我们将使用16个标准字节数组大小:
  • BigDecimal myDecimalValue = new BigDecimal("99.9999");
    
    //First we need to make sure the BigDecimal matches our schema scale:
    myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP);
    
    //Next we get the decimal value as one BigInteger (like there was no decimal point)
    BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue();
    
    //Finally we serialize the integer
    byte[] decimalBytes = myUnscaledDecimalValue.toByteArray();
    
    //We need to create an Avro 'Fixed' type and pass the decimal schema once more here:
    GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}"));
    
    byte[] myDecimalBuffer = new byte[16];
    if (myDecimalBuffer.length >= decimalBytes.length) {            
        //Because we set our fixed byte array size as 16 bytes, we need to
        //pad-left our original value's bytes with zeros
        int myDecimalBufferIndex = myDecimalBuffer.length - 1;
        for(int i = decimalBytes.length - 1; i >= 0; i--){
            myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i];
            myDecimalBufferIndex--;
        }
        //Save result
        fixed.bytes(myDecimalBuffer);
    } else {
        throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length));
    }
    
    //We can finally write our decimal to our record
    record.put("myDecimal", fixed);
    
  • 对于Date值,Avro指定我们需要将自EPOCH起的天数保存为整数。 (如果还需要时间部分,例如实际的DateTime类型,则需要使用 Timestamp Avro类型,我将不介绍它)。
    我发现自epoch以来获得天数的最简单方法是使用joda-time库。如果您将 hadoop-aws 依赖项添加到您的项目中,则您应该已经有了该库。如果不是,则需要自己添加:
    //Get epoch value
    MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC);
    
    DateTime currentDate = new DateTime(); //Can take Java Date in constructor
    Days days = Days.daysBetween(epoch, currentDate);
    
    //We can write number of days since epoch into the record
    record.put("myDate", days.getDays());
    
  • 我们终于可以这样开始编写 Parquet 文件了
    try {
       Configuration conf = new Configuration();
       conf.set("fs.s3a.access.key", "ACCESSKEY");
       conf.set("fs.s3a.secret.key", "SECRETKEY");
       //Below are some other helpful settings
       //conf.set("fs.s3a.endpoint", "s3.amazonaws.com");
       //conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
       //conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library.
       //conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors
    
       Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet");
    
       //Use path below to save to local file system instead
       //Path path = new Path("data.parquet");
    
       try (ParquetWriter writer = AvroParquetWriter.builder(path)
               .withSchema(avroSchema)
               .withCompressionCodec(CompressionCodecName.GZIP)
               .withConf(conf)
               .withPageSize(4 * 1024 * 1024) //For compression
               .withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size)
               .build()) {
           //We only have one record to write in our example
           writer.write(record);
       }
    } catch (Exception ex) { ex.printStackTrace(System.out); }
  • 以下是加载到Apache Spark(2.2.0)中的数据:
    spark

  • 为了方便起见,整个源代码为:
    package com.mycompany.stackoverflow;
    
    import java.math.BigDecimal;
    import java.math.BigInteger;
    import java.math.RoundingMode;
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.parquet.avro.AvroParquetWriter;
    import org.apache.parquet.hadoop.ParquetWriter;
    import org.apache.parquet.hadoop.metadata.CompressionCodecName;
    import org.joda.time.DateTime;
    import org.joda.time.DateTimeZone;
    import org.joda.time.Days;
    import org.joda.time.MutableDateTime;
    
    public class Main {
        public static void main(String[] args) {
            System.out.println("Start");
    
            String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything
                    + "\"type\": \"record\"," //Must be set as record
                    + "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything
                    + "\"fields\": ["
                    + " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field
                    + " {\"name\": \"myString\",  \"type\": [\"string\", \"null\"]},"
                    + " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]},"
                    + " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}"
                    + " ]}";
    
            Schema.Parser parser = new Schema.Parser().setValidate(true);
            Schema avroSchema = parser.parse(schema);
    
            GenericData.Record record = new GenericData.Record(avroSchema);
            record.put("myInteger", 1);
            record.put("myString", "string value 1");
    
            BigDecimal myDecimalValue = new BigDecimal("99.9999");
    
            //First we need to make sure the huge decimal matches our schema scale:
            myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP);
    
            //Next we get the decimal value as one BigInteger (like there was no decimal point)
            BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue();
    
            //Finally we serialize the integer
            byte[] decimalBytes = myUnscaledDecimalValue.toByteArray();
    
            //We need to create an Avro 'Fixed' type and pass the decimal schema once more here:
            GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}"));
    
            byte[] myDecimalBuffer = new byte[16];
            if (myDecimalBuffer.length >= decimalBytes.length) {            
                //Because we set our fixed byte array size as 16 bytes, we need to
                //pad-left our original value's bytes with zeros
                int myDecimalBufferIndex = myDecimalBuffer.length - 1;
                for(int i = decimalBytes.length - 1; i >= 0; i--){
                    myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i];
                    myDecimalBufferIndex--;
                }
    
                //Save result
                fixed.bytes(myDecimalBuffer);
            } else {
                throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length));
            }
    
            //We can finally write our decimal to our record
            record.put("myDecimal", fixed);
    
            //Get epoch value
            MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC);
    
            DateTime currentDate = new DateTime(); //Can take Java Date in constructor
            Days days = Days.daysBetween(epoch, currentDate);
    
            //We can write number of days since epoch into the record
            record.put("myDate", days.getDays());
    
            try {
               Configuration conf = new Configuration();
               conf.set("fs.s3a.access.key", "ACCESSKEY");
               conf.set("fs.s3a.secret.key", "SECRETKEY");
               //Below are some other helpful settings
               //conf.set("fs.s3a.endpoint", "s3.amazonaws.com");
               //conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
               //conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library.
               //conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors.
    
               Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet");
    
               //Use path below to save to local file system instead
               //Path path = new Path("data.parquet");
    
               try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData.Record>builder(path)
                       .withSchema(avroSchema)
                       .withCompressionCodec(CompressionCodecName.GZIP)
                       .withConf(conf)
                       .withPageSize(4 * 1024 * 1024) //For compression
                       .withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size)
                       .build()) {
    
                   //We only have one record to write in our example
                   writer.write(record);
               }
            } catch (Exception ex) { 
                ex.printStackTrace(System.out);
            }
        }
    }
    

    关于java - 如何使用纯Java(包括日期和小数类型)生成Parquet文件并将其上传到S3 [Windows](无HDFS),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47355038/

    相关文章:

    java - 使用 JBoss 或 Tomcat 的独立应用程序

    java - 字符串看起来相同但不匹配;列表迭代器异常

    java - 将 Angular 嵌入到 Spring 应用程序中并在运行 ngserve 时访问 Spring Controller

    java - 如何基于列合并两个数据帧spark java/scala?

    scala - Scala Spark 中的 groupBy 函数需要 Lzocodec 吗?

    url - 限制我的移动应用程序访问云端签名 URL(GET 请求)

    amazon-web-services - 从 Pyspark 调用 AWS S3 存储桶时出错。 AWS 错误代码 : null, AWS 错误消息:错误请求

    java - 使用 Quartz 在 Java 中进行异步调度

    java - Spark - 在本地主机上执行时急切加载和缓存 RDD

    amazon-web-services - 部署应用程序时如何在Amazon ElasticSearch中设置对queryESFunction的授权?