scala - SparkSQL Dataframe 函数是否爆炸保留顺序?

标签 scala apache-spark apache-spark-sql

我有一个 Scala spark DataFrame:

df.select($"row_id", $"array_of_data").show
+----------+--------------------+
| row_id   |      array_of_data |
+----------+--------------------+
|       0  | [123, ABC, G12]    |
|       1  | [100, 410]         |
|       2  | [500, 300,  ...]   |

我想分解这些数组,以便每个元素都在不同的行中,但我还想标记哪一行对应于数组的第一个元素:
+----------+--------------------+----------+----------+
| row_id   |      array_of_data | exploded | is_first |
+----------+--------------------+----------+----------+
|       0  | [123, ABC, G12]    | 123      |    Yes   |
|       0  | [123, ABC, G12]    | ABC      |    No    |
|       0  | [123, ABC, G12]    | G12      |    No    |

为了实现这一点,我使用了 explode function ,并希望第一行对应第一个数据元素:
var exploded_df = df.withColumn("exploded", explode($"array_of_data"))

val window = Window.partitionBy("row_id").orderBy("row_id")
// Create an internal rank variable to figure out the first element
exploded_df = exploded_df.withColumn("_rank", row_number().over(window))
exploded_df = exploded_df.withColumn("is_first",
                                     when(($"_rank" === 1), "Yes").otherwise("No")
                                    )

这似乎适用于我的目的并产生所需的输出,但我可以相信这将始终有效吗?我在爆炸文档中找不到任何 promise 这种行为的地方,相信 Spark 数据帧中的行顺序似乎是不明智的。

我能想到的唯一其他解决方案是为 array_of_data 中的每个元素创建一个新列。然后匹配时 exploded等于第一列的值,但我不能保证数组中不会有重复的值。

最佳答案

您可以使用 posexplode为此目的而发挥作用。

正如api文档解释的那样

Creates a new row for each element with position in the given array or map column.



您可以使用 select功能,以便位置和分解列形成单独的列作为
import org.apache.spark.sql.functions._
df.select($"row_id", posexplode($"array_of_data")).show(false)

这应该给你
+------+---------------+---+---+
|row_id|array_of_data  |pos|col|
+------+---------------+---+---+
|0     |[123, ABC, G12]|0  |123|
|0     |[123, ABC, G12]|1  |ABC|
|0     |[123, ABC, G12]|2  |G12|
+------+---------------+---+---+

关于scala - SparkSQL Dataframe 函数是否爆炸保留顺序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49157068/

相关文章:

scala - Spark : How can DataFrame be Dataset[Row] if DataFrame's have a schema

apache-spark - 为什么 Spark 计数 Action 分三个阶段执行

javac 和 scalac 警告仅针对特定类型的错误

scala - 为什么 sbt 认为这是一个 'unknown artifact?'

apache-spark - 为什么kryo注册在SparkSession中不起作用?

apache-spark - 如何在 Spark RDD 中按多个键进行分组?

scala - bigquery 在数组外添加重复记录

scala - 提升 - 将值列表绑定(bind)到模板

scala - 从Spark作业访问HDFS HA(UnknownHostException错误)

dataframe - 计算 Spark 数据帧中缺失值的数量