java - 使用 java 将索引列添加到 apache Spark Dataset<Row>

标签 java apache-spark

下面的问题有 scala 和 pyspark 的解决方案,并且该问题提供的解决方案不适用于连续索引值。

Spark Dataframe :How to add a index Column : Aka Distributed Data Index

我在 Apache-spark 中有一个现有的数据集,我想根据索引从中选择一些行。我计划添加一个索引列,其中包含从 1 开始的唯一值,并且根据该列的值我将获取行。 我发现下面的方法来添加使用 order by 的索引:

df.withColumn("index", functions.row_number().over(Window.orderBy("a column")));

我不想使用 order by。我需要索引的顺序与数据集中存在的顺序相同。有什么帮助吗?

最佳答案

根据我收集的信息,您正在尝试向数据帧添加索引(具有连续值)。不幸的是,Spark 中没有内置函数可以执行此操作。您只能使用 df.withColumn("index", monotonicallyIncreasingId) 添加递增索引(但不一定是连续值)。

尽管如此,RDD API 中存在一个 zipWithIndex 函数,它可以完全满足您的需要。因此,我们可以定义一个函数,将数据帧转换为 RDD,添加索引并将其转换回数据帧。

我不是java中spark的专家(scala更紧凑),所以也许可以做得更好。这是我的做法。

public static Dataset<Row> zipWithIndex(Dataset<Row> df, String name) {
    JavaRDD<Row> rdd = df.javaRDD().zipWithIndex().map(t -> {
        Row r = t._1;
        Long index = t._2 + 1;
        ArrayList<Object> list = new ArrayList<>();
        r.toSeq().iterator().foreach(x -> list.add(x));
        list.add(index);
        return RowFactory.create(list);
    });
    StructType newSchema = df.schema()
            .add(new StructField(name, DataTypes.LongType, true, null));
    return df.sparkSession().createDataFrame(rdd, newSchema);
}

以下是您将如何使用它。请注意内置 Spark 函数的作用与我们的方法的作用对比。

Dataset<Row> df = spark.range(5)
    .withColumn("index1", functions.monotonicallyIncreasingId());
Dataset<Row> result = zipWithIndex(df, "good_index");
// df
+---+-----------+
| id|     index1|
+---+-----------+
|  0|          0|
|  1| 8589934592|
|  2|17179869184|
|  3|25769803776|
|  4|25769803777|
+---+-----------+

// result
+---+-----------+----------+
| id|     index1|good_index|
+---+-----------+----------+
|  0|          0|         1|
|  1| 8589934592|         2|
|  2|17179869184|         3|
|  3|25769803776|         4|
|  4|25769803777|         5|
+---+-----------+----------+

关于java - 使用 java 将索引列添加到 apache Spark Dataset<Row>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56163337/

相关文章:

apache-spark - 根据日期过滤 Spark 数据框

apache-spark - PySpark 如何按值排序(如果值相等则按键排序)?

Java乘法奇怪的行为

java - 使用 Netty 进行编写的有效方法

java - 在 Java 中创建 UTF-8 文件

apache-spark - 处理大数据集时的 FetchFailedException 或 MetadataFetchFailedException

java - Tomcat 7 上的 Spring 应用程序问题

Java Stream API 字符串操作

scala - 使用 Map 替换 Spark 中的列值

python - 有没有办法在不破坏函数链的情况下在 PySpark 中执行强制转换或 withColumn 数据帧操作?