下面的问题有 scala 和 pyspark 的解决方案,并且该问题提供的解决方案不适用于连续索引值。
Spark Dataframe :How to add a index Column : Aka Distributed Data Index
我在 Apache-spark 中有一个现有的数据集,我想根据索引从中选择一些行。我计划添加一个索引列,其中包含从 1 开始的唯一值,并且根据该列的值我将获取行。 我发现下面的方法来添加使用 order by 的索引:
df.withColumn("index", functions.row_number().over(Window.orderBy("a column")));
我不想使用 order by。我需要索引的顺序与数据集中存在的顺序相同。有什么帮助吗?
最佳答案
根据我收集的信息,您正在尝试向数据帧添加索引(具有连续值)。不幸的是,Spark 中没有内置函数可以执行此操作。您只能使用 df.withColumn("index", monotonicallyIncreasingId
) 添加递增索引(但不一定是连续值)。
尽管如此,RDD API 中存在一个 zipWithIndex
函数,它可以完全满足您的需要。因此,我们可以定义一个函数,将数据帧转换为 RDD,添加索引并将其转换回数据帧。
我不是java中spark的专家(scala更紧凑),所以也许可以做得更好。这是我的做法。
public static Dataset<Row> zipWithIndex(Dataset<Row> df, String name) {
JavaRDD<Row> rdd = df.javaRDD().zipWithIndex().map(t -> {
Row r = t._1;
Long index = t._2 + 1;
ArrayList<Object> list = new ArrayList<>();
r.toSeq().iterator().foreach(x -> list.add(x));
list.add(index);
return RowFactory.create(list);
});
StructType newSchema = df.schema()
.add(new StructField(name, DataTypes.LongType, true, null));
return df.sparkSession().createDataFrame(rdd, newSchema);
}
以下是您将如何使用它。请注意内置 Spark 函数的作用与我们的方法的作用对比。
Dataset<Row> df = spark.range(5)
.withColumn("index1", functions.monotonicallyIncreasingId());
Dataset<Row> result = zipWithIndex(df, "good_index");
// df
+---+-----------+
| id| index1|
+---+-----------+
| 0| 0|
| 1| 8589934592|
| 2|17179869184|
| 3|25769803776|
| 4|25769803777|
+---+-----------+
// result
+---+-----------+----------+
| id| index1|good_index|
+---+-----------+----------+
| 0| 0| 1|
| 1| 8589934592| 2|
| 2|17179869184| 3|
| 3|25769803776| 4|
| 4|25769803777| 5|
+---+-----------+----------+
关于java - 使用 java 将索引列添加到 apache Spark Dataset<Row>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56163337/