scala - 无法在 spark 中写入 csv 文件

标签 scala csv apache-spark spark-dataframe

我正在尝试使用 spark 1.6.1 编写一个 csv 文件。 假设我有一个这样的 csv 文件:

date,category
19900108,apples
19900108,apples
19900308,peaches
19900408,peaches
19900508,pears
19910108,pears
19910108,peaches
19910308,apples
19910408,apples
19910508,apples
19920108,pears
19920108,peaches
19920308,apples
19920408,peaches
19920508,pears

我想像这样创建一个输出 csv 文件:

date,apples,peaches,pears
1990,2,2,1
1991,3,1,1
1992,1,2,2

我正在使用这个 Scala 代码加载文件:

spark-shell --packages com.databricks:spark-csv_2.11:1.2.0

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._

val sqlContext = new SQLContext(sc)

var df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").option("inferSchema", "true").load("data/sample.csv")

df = df.withColumn("year", df.col("date").substr(0,4)) 
df.groupBy("year").pivot("category").agg("category"->"count").show()

当我运行它时,我得到了这个数据框,这正是我想要的

+----+------+-------+-----+
|year|apples|peaches|pears|
+----+------+-------+-----+
|1990|     2|      2|    1|
|1991|     3|      1|    1|
|1992|     1|      2|    2|
+----+------+-------+-----+

但是当我尝试使用此代码将其写入 CSV 时:

 df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("mydata4.csv")

这是我打开时得到的 csv 文件,这不是我要找的。

date,category,year
19900108,apples,1990
19900108,apples,1990
19900308,peaches,1990
19900408,peaches,1990
19900508,pears,1990
19910108,pears,1991
19910108,peaches,1991
19910308,apples,1991
19910408,apples,1991
19910508,apples,1991
19920108,pears,1992
19920108,peaches,1992
19920308,apples,1992
19920408,peaches,1992
19920508,pears,1992

我错过了什么吗?难道我做错了什么?

最佳答案

您忘记将查询结果存储到新变量中。

val xf = df.groupBy("year").pivot("category").agg("category"->"count")

然后使用您的最后一行代码编写它。

xf.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("mydata4.csv")

关于scala - 无法在 spark 中写入 csv 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38944885/

相关文章:

apache-spark - 如何计算 Spark 逻辑回归中的 p 值?

java - 使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet

scala - 如何显示所有类型的对象(Scala中)的?

java - Flink BucketingSink 因 NoClassDefFoundError : Lorg/apache/hadoop/fs/FileSystem 崩溃

vim - 有没有人让 vim taglist 插件与 Scala 一起工作?

javascript - 禁止输入标点符号

java - 创建 SQL 插入语句到 CSV 文件

json - Grails-渲染的Json文件太大,无法进行客户端操作

scala - 在 Cucumber 步骤定义之间传递变量

apache-spark - 哪些操作保留 RDD 顺序?