apache-spark - 如何在 pyspark 数据帧读取方法中包含分区列

标签 apache-spark pyspark apache-spark-sql pyspark-dataframes

我正在从 parquet 文件编写基于文件的 Avro。我已阅读如下文件:

读取数据

dfParquet = spark.read.format("parquet").option("mode", "FAILFAST")
    .load("/Users/rashmik/flight-time.parquet")

写入数据

我已经用 Avro 格式编写了如下文件:

dfParquetRePartitioned.write \
    .format("avro") \
    .mode("overwrite") \
    .option("path", "datasink/avro") \
    .partitionBy("OP_CARRIER") \
    .option("maxRecordsPerFile", 100000) \
    .save()

正如预期的那样,我得到了按 OP_CARRIER 分区的数据。

从特定分区读取 Avro 分区数据

在另一项工作中,我需要从上述工作的输出中读取数据,即从 datasink/avro 目录中读取数据。我正在使用下面的代码从 datasink/avro

读取
dfAvro = spark.read.format("avro") \
    .option("mode","FAILFAST") \
    .load("datasink/avro/OP_CARRIER=AA")

它成功读取数据,但正如预期的那样,OP_CARRIER 列在 dfAvro 数据帧中不可用,因为它是第一个作业的分区列。现在我的要求是在第二个数据帧中也包括 OP_CARRIER 字段,即在 dfAvro 中。有人可以帮我解决这个问题吗?

我指的是来自 spark document 的文档, 但我无法找到相关信息。任何指针都会非常有帮助。

最佳答案

您使用不同的别名复制相同的列值。

dfParquetRePartitioned.withColumn("OP_CARRIER_1", lit(df.OP_CARRIER)) \
.write \
.format("avro") \
.mode("overwrite") \
.option("path", "datasink/avro") \
.partitionBy("OP_CARRIER") \
.option("maxRecordsPerFile", 100000) \
.save()

这会给你你想要的。但别名不同。 或者你也可以在阅读时做。如果位置是动态的,那么您可以轻松地追加该列。

path = "datasink/avro/OP_CARRIER=AA"
newcol = path.split("/")[-1].split("=") 
dfAvro = spark.read.format("avro") \
.option("mode","FAILFAST") \
.load(path).withColumn(newcol[0], lit(newcol[1]))

如果该值是静态的,则在读取数据期间更容易添加它。

关于apache-spark - 如何在 pyspark 数据帧读取方法中包含分区列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63532323/

相关文章:

apache-spark - 如何从两个列表创建 PySpark 数据框?

scala - 如何在 Spark 中打印特定 RDD 分区的元素?

apache-spark - 在 Azure HDIinsight 集群中指定 --files 时,Spark 在 yarn 集群模式下提交失败

apache-spark - 是作为执行引擎还是应用程序?

scala - 如何动态创建列引用?

java - 识别父数据框中不存在于java子集数据框中的记录

hadoop - 尝试在 Spark 中使用 Jena elephas 的 TriplesInputFormat 读取 RDF 文件时出现 NullPointerException

scala - Spark 1.3.0 的 API 中 SchemaRDD 的声明在哪里

python - 如何 : Pyspark dataframe persist usage and reading-back

python - Pyspark 将结构数组转换为字符串