scala - spark 数据框将行值转换为列名

标签 scala dataframe apache-spark apache-spark-sql spark-streaming

我需要使用 spark dataframe 将行值转换为列并按用户 ID 进行分区并创建一个 csv 文件。


val someDF = Seq(
  ("user1", "math","algebra-1","90"),
  ("user1", "physics","gravity","70"),
  ("user3", "biology","health","50"),
  ("user2", "biology","health","100"),
  ("user1", "math","algebra-1","40"),
  ("user2", "physics","gravity-2","20")
).toDF("user_id", "course_id","lesson_name","score")

someDF.show(false)

+-------+---------+-----------+-----+
|user_id|course_id|lesson_name|score|
+-------+---------+-----------+-----+
|  user1|     math|  algebra-1|   90|
|  user1|  physics|    gravity|   70|
|  user3|  biology|     health|   50|
|  user2|  biology|     health|  100|
|  user1|     math|  algebra-1|   40|
|  user2|  physics|  gravity-2|   20|
+-------+---------+-----------+-----+

val result = someDF.groupBy("user_id", "course_id").pivot("lesson_name").agg(first("score"))

result.show(false)

+-------+---------+---------+-------+---------+------+
|user_id|course_id|algebra-1|gravity|gravity-2|health|
+-------+---------+---------+-------+---------+------+
|  user3|  biology|     null|   null|     null|    50|
|  user1|     math|       90|   null|     null|  null|
|  user2|  biology|     null|   null|     null|   100|
|  user2|  physics|     null|   null|       20|  null|
|  user1|  physics|     null|     70|     null|  null|
+-------+---------+---------+-------+---------+------+


通过上面的代码,我可以将行值(类(class)名称)转换为列名称。 但我需要将输出保存在 course_wise

中的 csv 中

预计在 csv 格式下应该是这样的。

biology.csv // Expected Output

+-------+---------+------+
|user_id|course_id|health|
+-------+---------+------+
|  user3|  biology|  50  |
|  user2|  biology| 100  |
+-------+---------+-------

physics.csv // Expected Output

+-------+---------+---------+-------
|user_id|course_id|gravity-2|gravity|
+-------+---------+---------+-------+
|  user2|  physics|  50     |  null |
|  user1|  physics| 100     |  70   | 
+-------+---------+---------+-------+

**注意:csv 中的每门类(class)应仅包含其特定的类(class)名称,不应包含任何不相关的类(class)类(class)名称。

实际上在 csv 中我可以在下面的 formate 中**

result.write
  .partitionBy("course_id")
  .mode("overwrite")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save(somepath)


例如:

biology.csv // Wrong output, Due to it is containing non-relevant course lesson's(algebra-1,gravity-2,algebra-1)
+-------+---------+---------+-------+---------+------+
|user_id|course_id|algebra-1|gravity|gravity-2|health|
+-------+---------+---------+-------+---------+------+
|  user3|  biology|     null|   null|     null|    50|
|  user2|  biology|     null|   null|     null|   100|
+-------+---------+---------+-------+---------+------+

谁能帮忙解决这个问题?

最佳答案

只需在调整之前按类(class)过滤:

val result = someDF.filter($"course_id" === "physics").groupBy("user_id", "course_id").pivot("lesson_name").agg(first("score"))

+-------+---------+-------+---------+
|user_id|course_id|gravity|gravity-2|
+-------+---------+-------+---------+
|user2  |physics  |null   |20       |
|user1  |physics  |70     |null     |

+--------+--------+------+--------+

关于scala - spark 数据框将行值转换为列名,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57727480/

相关文章:

algorithm - 如何根据模式匹配将一个 map 中的值替换为另一个 map 中的值?

scala - 具有定期更新的静态数据集的结构化流

apache-spark - 长时间运行时打开的文件太多 Kafka 异常

python - 将数据框 Pandas 的列更改为其中一列的值

apache-spark - 为什么 insertInto 失败并显示 "assertion failed: No plan for InsertIntoTable"?

scala - Promise.tryComplete中的参数指的是什么?

java - 如何在 Java 测试文件中使用 ScalaTest?

scala - 尽管参数类型不同,但双重定义错误

python - 转换为 html 表时删除 pandas 数据框中的索引

python-3.x - 如何制作跨多个列的 Pandas 框架值,其列