最近,我有一个要求,我需要生成Parquet文件,该文件只能由Apache Spark仅使用Java读取(不使用其他软件安装,例如:Apache Drill,Hive,Spark等)。这些文件需要保存到S3,因此我将共享有关如何同时执行这两个操作的详细信息。
没有简单的遵循指南来执行此操作。我也不是Java程序员,所以使用Maven,Hadoop等的概念对我来说是陌生的。因此,我花了将近两个星期的时间才能完成这项工作。我想在下面分享我的个人指导,以了解如何实现这一目标
最佳答案
免责声明:以下代码示例绝不代表最佳实践,仅作为粗略的方法介绍。
依赖项:
我将使用 NetBeans 作为我的IDE。
有关Java中 Parquet 的一些信息(适用于像我这样的菜鸟):
前提条件:
您必须在将运行Java代码的Windows机器上安装hadoop。好消息是您不需要安装整个hadoop软件,而仅需要两个文件:
这些可以下载到here。对于此示例,您将需要版本2.8.1(由于parquet-avro 1.9.0)。
如果未正确完成此配置,则在运行时会出现以下错误:
java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
编码入门:
为了避免这种情况,我在运行时生成了Schema JSON并对其进行了解析。以下是示例架构:
String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything
+ "\"type\": \"record\"," //Must be set as record
+ "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything
+ "\"fields\": ["
+ " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field
+ " {\"name\": \"myString\", \"type\": [\"string\", \"null\"]},"
+ " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]},"
+ " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}"
+ " ]}";
Parser parser = new Schema.Parser().setValidate(true);
Schema avroSchema = parser.parse(schema);
有关Avro模式的详细信息可以在这里找到:https://avro.apache.org/docs/1.8.0/spec.html
GenericData.Record record = new GenericData.Record(avroSchema);
record.put("myInteger", 1);
record.put("myString", "string value 1");
fixed_len_byte_array
)。因此,在我们的情况下,我们也必须使用固定的(在模式中可以看到)。在Java中,我们必须使用BigDecimal
才能真正处理小数。而且我已经确定,不管值如何,Decimal(32,4)
所占用的字节数都不会超过16个字节。因此,在下面的序列化中(以及上面的模式中),我们将使用16个标准字节数组大小:BigDecimal myDecimalValue = new BigDecimal("99.9999");
//First we need to make sure the BigDecimal matches our schema scale:
myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP);
//Next we get the decimal value as one BigInteger (like there was no decimal point)
BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue();
//Finally we serialize the integer
byte[] decimalBytes = myUnscaledDecimalValue.toByteArray();
//We need to create an Avro 'Fixed' type and pass the decimal schema once more here:
GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}"));
byte[] myDecimalBuffer = new byte[16];
if (myDecimalBuffer.length >= decimalBytes.length) {
//Because we set our fixed byte array size as 16 bytes, we need to
//pad-left our original value's bytes with zeros
int myDecimalBufferIndex = myDecimalBuffer.length - 1;
for(int i = decimalBytes.length - 1; i >= 0; i--){
myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i];
myDecimalBufferIndex--;
}
//Save result
fixed.bytes(myDecimalBuffer);
} else {
throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length));
}
//We can finally write our decimal to our record
record.put("myDecimal", fixed);
我发现自epoch以来获得天数的最简单方法是使用joda-time库。如果您将 hadoop-aws 依赖项添加到您的项目中,则您应该已经有了该库。如果不是,则需要自己添加:
//Get epoch value
MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC);
DateTime currentDate = new DateTime(); //Can take Java Date in constructor
Days days = Days.daysBetween(epoch, currentDate);
//We can write number of days since epoch into the record
record.put("myDate", days.getDays());
try {
Configuration conf = new Configuration();
conf.set("fs.s3a.access.key", "ACCESSKEY");
conf.set("fs.s3a.secret.key", "SECRETKEY");
//Below are some other helpful settings
//conf.set("fs.s3a.endpoint", "s3.amazonaws.com");
//conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
//conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library.
//conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors
Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet");
//Use path below to save to local file system instead
//Path path = new Path("data.parquet");
try (ParquetWriter writer = AvroParquetWriter.builder(path)
.withSchema(avroSchema)
.withCompressionCodec(CompressionCodecName.GZIP)
.withConf(conf)
.withPageSize(4 * 1024 * 1024) //For compression
.withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size)
.build()) {
//We only have one record to write in our example
writer.write(record);
}
} catch (Exception ex) { ex.printStackTrace(System.out); }
为了方便起见,整个源代码为:
package com.mycompany.stackoverflow;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Days;
import org.joda.time.MutableDateTime;
public class Main {
public static void main(String[] args) {
System.out.println("Start");
String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything
+ "\"type\": \"record\"," //Must be set as record
+ "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything
+ "\"fields\": ["
+ " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field
+ " {\"name\": \"myString\", \"type\": [\"string\", \"null\"]},"
+ " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]},"
+ " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}"
+ " ]}";
Schema.Parser parser = new Schema.Parser().setValidate(true);
Schema avroSchema = parser.parse(schema);
GenericData.Record record = new GenericData.Record(avroSchema);
record.put("myInteger", 1);
record.put("myString", "string value 1");
BigDecimal myDecimalValue = new BigDecimal("99.9999");
//First we need to make sure the huge decimal matches our schema scale:
myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP);
//Next we get the decimal value as one BigInteger (like there was no decimal point)
BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue();
//Finally we serialize the integer
byte[] decimalBytes = myUnscaledDecimalValue.toByteArray();
//We need to create an Avro 'Fixed' type and pass the decimal schema once more here:
GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}"));
byte[] myDecimalBuffer = new byte[16];
if (myDecimalBuffer.length >= decimalBytes.length) {
//Because we set our fixed byte array size as 16 bytes, we need to
//pad-left our original value's bytes with zeros
int myDecimalBufferIndex = myDecimalBuffer.length - 1;
for(int i = decimalBytes.length - 1; i >= 0; i--){
myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i];
myDecimalBufferIndex--;
}
//Save result
fixed.bytes(myDecimalBuffer);
} else {
throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length));
}
//We can finally write our decimal to our record
record.put("myDecimal", fixed);
//Get epoch value
MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC);
DateTime currentDate = new DateTime(); //Can take Java Date in constructor
Days days = Days.daysBetween(epoch, currentDate);
//We can write number of days since epoch into the record
record.put("myDate", days.getDays());
try {
Configuration conf = new Configuration();
conf.set("fs.s3a.access.key", "ACCESSKEY");
conf.set("fs.s3a.secret.key", "SECRETKEY");
//Below are some other helpful settings
//conf.set("fs.s3a.endpoint", "s3.amazonaws.com");
//conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
//conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library.
//conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors.
Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet");
//Use path below to save to local file system instead
//Path path = new Path("data.parquet");
try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData.Record>builder(path)
.withSchema(avroSchema)
.withCompressionCodec(CompressionCodecName.GZIP)
.withConf(conf)
.withPageSize(4 * 1024 * 1024) //For compression
.withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size)
.build()) {
//We only have one record to write in our example
writer.write(record);
}
} catch (Exception ex) {
ex.printStackTrace(System.out);
}
}
}
关于java - 如何使用纯Java(包括日期和小数类型)生成Parquet文件并将其上传到S3 [Windows](无HDFS),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47355038/