java - 迭代 Spark 数据集的行并在 Java API 中应用操作

标签 java apache-spark apache-spark-dataset

Spark (2.4.x) 新手并使用 Java API(不是 Scala!!!)

我有一个从 CSV 文件中读取的数据集。它有一个架构(命名列),如下所示:

id (integer)  |  name (string)  |  color (string)  |  price (double)  |  enabled (boolean)

示例行:

23 | "hotmeatballsoup" | "blue" | 3.95 | true

数据集中有许多(数万)行。我想使用正确的 Java/Spark API 编写一个表达式,滚动浏览每一行并对每一行应用以下两个操作:

  1. 如果价格为null,则默认为0.00;然后
  2. 如果颜色列值为“红色”,则在价格中添加 2.55

由于我对 Spark 很陌生,所以我什至不知道从哪里开始!迄今为止我最好的尝试肯定是错误的,但我想这至少是一个起点:

Dataset csvData = sparkSession.read()
    .format("csv")
    .load(fileToLoad.getAbsolutePath());

// ??? get rows somehow
Seq<Seq<String>> csvRows = csvData.getRows(???, ???);

// now how to loop through rows???
for (Seq<String> row : csvRows) {
    // how apply two operations specified above???
    if (row["price"] == null) {
        row["price"] = 0.00;
    }

    if (row["color"].equals("red")) {
        row["price"] = row["price"] + 2.55;
    }
}

有人可以帮助我朝正确的方向前进吗?

最佳答案

您可以使用spark sql api来实现它。空值也可以使用 DataFrameNaFunctions 中的 .fill() 替换为值。否则,您可以将 Dataframe 转换为 Dataset 并在 .map 中执行这些步骤,但在这种情况下 sql api 更好、更高效。

+---+---------------+-----+-----+-------+
| id|           name|color|price|enabled|
+---+---------------+-----+-----+-------+
| 23|hotmeatballsoup| blue| 3.95|   true|
| 24|            abc|  red|  1.0|   true|
| 24|            abc|  red| null|   true|
+---+---------------+-----+-----+-------+

在类声明之前导入sql函数:

import static org.apache.spark.sql.functions.*;

sql API:

df.select(
        col("id"), col("name"), col("color"),
        when(col("color").equalTo("red").and(col("price").isNotNull()), col("price").plus(2.55))
        .when(col("color").equalTo("red").and(col("price").isNull()), 2.55)
        .otherwise(col("price")).as("price")
        ,col("enabled")
).show();

或者使用临时 View 和sql查询:

df.createOrReplaceTempView("df");
spark.sql("select id,name,color, case when color = 'red' and price is not null then (price + 2.55) when color = 'red' and price is null then 2.55 else price end as price, enabled from df").show();

输出:

+---+---------------+-----+-----+-------+
| id|           name|color|price|enabled|
+---+---------------+-----+-----+-------+
| 23|hotmeatballsoup| blue| 3.95|   true|
| 24|            abc|  red| 3.55|   true|
| 24|            abc|  red| 2.55|   true|
+---+---------------+-----+-----+-------+

关于java - 迭代 Spark 数据集的行并在 Java API 中应用操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61106112/

相关文章:

Java Web 服务客户端到 WCF Web 服务服务器。互操作性解决方案

java - 捕获 Spark 驱动程序上的 Dataset foreachPartition() 函数中抛出的异常?

scala - Spark数据帧-按键减少

python - 为什么 python dataFrames' 只位于同一台机器上?

scala - Spark 可以将数据直接读取到嵌套的案例类中吗?

java - NoSuchBeanDefinitionException : No qualifying bean of type (JpaRepository and Java Config)

java - 在 Jersey 中,我可以将 QueryParams 和 FormParams 组合成一个方法的值吗?

java - 服务生成器没有添加一些依赖项?

eclipse - 在 eclipse Spark scala 调试 session 中,在哪里可以找到 RDD 中的数据?

scala - Spark流从Twitter获取数据并保存到Cassandra