这实际上与我的 previous question 相同,但使用 Avro 而不是 JSON 作为数据格式。
我正在使用 Spark 数据框,它可以从几个不同的模式版本之一加载数据:
// Version One
{"namespace": "com.example.avro",
"type": "record",
"name": "MeObject",
"fields": [
{"name": "A", "type": ["null", "int"], "default": null}
]
}
// Version Two
{"namespace": "com.example.avro",
"type": "record",
"name": "MeObject",
"fields": [
{"name": "A", "type": ["null", "int"], "default": null},
{"name": "B", "type": ["null", "int"], "default": null}
]
}
我正在使用 Spark Avro加载数据。
DataFrame df = context.read()
.format("com.databricks.spark.avro")
.load("path/to/avro/file");
可能是版本一文件或版本二文件。但是我希望能够以相同的方式处理它,将未知值设置为“null”。我在上一个问题中的建议是设置模式,但是我不想重复自己在 .avro
文件和 spark StructType
和 friend 中编写模式。如何将 avro 架构(文本文件或生成的 MeObject.getClassSchema()
)转换为 spark StructType
?
Spark Avro 有一个 SchemaConverters
,但它都是私有(private)的,并返回一些奇怪的内部对象。
最佳答案
免责声明:这是一种肮脏的技巧。这取决于几件事:
- Python 提供了一个 lightweight Avro processing library由于它的活力,它不需要打字的作家
- 一个空的 Avro 文件仍然是一个有效的文件
- Spark 模式可以与 JSON 相互转换
以下代码读取 Avro 架构文件,使用给定架构创建空 Avro 文件,使用 spark-csv
读取它并将 Spark 架构输出为 JSON 文件。
import argparse
import tempfile
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
from pyspark import SparkContext
from pyspark.sql import SQLContext
def parse_schema(schema):
with open(schema) as fr:
return avro.schema.parse(open(schema).read())
def write_dummy(schema):
tmp = tempfile.mktemp(suffix='.avro')
with open(tmp, "w") as fw:
writer = DataFileWriter(fw, DatumWriter(), schema)
writer.close()
return tmp
def write_spark_schema(path, schema):
with open(path, 'w') as fw:
fw.write(schema.json())
def main():
parser = argparse.ArgumentParser(description='Avro schema converter')
parser.add_argument('--schema')
parser.add_argument('--output')
args = parser.parse_args()
sc = SparkContext('local[1]', 'Avro schema converter')
sqlContext = SQLContext(sc)
df = (sqlContext.read.format('com.databricks.spark.avro')
.load(write_dummy(parse_schema(args.schema))))
write_spark_schema(args.output, df.schema)
sc.stop()
if __name__ == '__main__':
main()
用法:
bin/spark-submit --packages com.databricks:spark-avro_2.10:2.0.1 \
avro_to_spark_schema.py \
--schema path_to_avro_schema.avsc \
--output path_to_spark_schema.json
读取模式:
import scala.io.Source
import org.apache.spark.sql.types.{DataType, StructType}
val json: String = Source.fromFile("schema.json").getLines.toList.head
val schema: StructType = DataType.fromJson(json).asInstanceOf[StructType]
关于java - 用于激发 StructType 的 Avro Schema,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33899417/