java - 根据特定列对 Spark Dataframe 进行分区,并将每个分区的内容转储到 csv 上

标签 java apache-spark apache-spark-sql

我正在使用 Spark 1.6.2 Java API 在 Dataframe DF1 中加载一些数据,如下所示:

Key  Value
A    v1
A    v2
B    v3
A    v4

现在我需要根据“Key”列中的值子集对 DF1 进行分区,并将每个分区转储到 csv 文件(使用 Spark-csv)。

期望的输出:

A.csv

Key Value
A   v1
A   v2
A   v4

B.csv

Key Value
B   v3

目前我正在做的是构建一个 HashMap (myList),其中包含我需要过滤的值的子集,然后迭代该值,每次迭代过滤不同的 Key。通过以下代码,我得到了我想要的,但我想知道是否有更有效的方法来做到这一点:

DF1 = <some operations>.cache();

for (Object filterKey: myList.keySet()) {
  DF2 = DF1.filter((String)myList.get(filterKey));

  DF2.write().format.format("com.databricks.spark.csv")
            .option("header", "true")
      .save("/" + filterKey + ".csv");
}

最佳答案

你已经差不多完成了,你只需要添加partitionBy,它将按照你想要的方式对文件进行分区。

DF1
  .filter{case(key, value) => myList.contains(key))
  .write
  .partitionBy("key")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("/my/basepath/")

文件现在将存储在“/my/basepath/key=A/”、“/my/basepath/key=B/”等下。

关于java - 根据特定列对 Spark Dataframe 进行分区,并将每个分区的内容转储到 csv 上,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40691710/

相关文章:

java - 表格宽度太大

java - 尽管完全不使用或引用Hadoop,但Apache Spark运行时异常 “Unable to load native-hadoop library for your platform”

amazon-web-services - 尝试从本地计算机访问 AWS 上的 HDFS 文件时出现错误

scala - spark数据帧爆炸功能错误

scala - 迭代 DataFrame 时更新列

java - 检测 Java 类以创建反向 Java 调试器的最佳方法是什么?

java - HttpMime 4.0.3 中的 InputStreamBody 设置内容长度

java - 使用简单的领域特定语言过滤集合

Java/Spark - 按加权平均聚合分组

scala - 通过 Spark 读取文件夹中保存的所有 Parquet 文件