java - 使用另一个数据集中的值搜索和更新 Spark 数据集列

标签 java apache-spark apache-spark-dataset apache-spark-2.0

此处为 Java 8 和 Spark 2.11:2.3.2。虽然我非常喜欢 Java API 答案,但我确实会说一点 Scala,所以我能够理解其中提供的任何答案!但是如果可能的话请使用 Java!

我有两个具有不同架构的数据集,但公共(public)“model_number”(字符串)列除外:两个数据集都存在。

对于第一个数据集中的每一行(我们将其称为 d1),我需要扫描/搜索第二个数据集(“d2”)以查看是否存在具有相同 model_number 的行,如果是,则更新另一个 d2 列。

这是我的数据集架构:

d1
===========
model_number : string
desc : string
fizz : string
buzz : date

d2
===========
model_number : string
price : double
source : string

同样,如果 d1 行的 model_number 为 12345,并且 d2 行也具有相同的 model_number,我想通过乘以 10.0 来更新 d2.price

迄今为止我最好的尝试:

// I *think* this would give me a 3rd dataset with all d1 and d2 columns, but only
// containing rows from d1 and d2 that have matching 'model_number' values
Dataset<Row> d3 = d1.join(d2, d1.col("model_number") == d2.col("model_number"));

// now I just need to update d2.price based on matching
Dataset<Row> d4 = d3.withColumn("adjusted_price", d3.col("price") * 10.0);

有人可以帮我冲过终点线吗?提前致谢!

最佳答案

这里有一些要点,正如 @VamsiPrabhala 在评论中提到的,您需要在特定字段上使用 join 函数。关于“update”,您需要记住中的dfdsrdd Spark 是不可变的,因此您无法更新它们。因此,这里的解决方案是,在join您的df之后,您需要在select中执行计算,在本例中为乘法或使用withColumn,然后select。换句话说,您无法更新该列,但可以使用“new”列创建新的df

示例:

Input data:

+------------+------+------+----+
|model_number|  desc|  fizz|buzz|
+------------+------+------+----+
|     model_a|desc_a|fizz_a|null|
|     model_b|desc_b|fizz_b|null|
+------------+------+------+----+

+------------+-----+--------+
|model_number|price|  source|
+------------+-----+--------+
|     model_a| 10.0|source_a|
|     model_b| 20.0|source_b|
+------------+-----+--------+

使用join将输出:

val joinedDF = d1.join(d2, "model_number")
joinedDF.show()

+------------+------+------+----+-----+--------+
|model_number|  desc|  fizz|buzz|price|  source|
+------------+------+------+----+-----+--------+
|     model_a|desc_a|fizz_a|null| 10.0|source_a|
|     model_b|desc_b|fizz_b|null| 20.0|source_b|
+------------+------+------+----+-----+--------+

应用您的计算:

joinedDF.withColumn("price", col("price") * 10).show()

output:
+------------+------+------+----+-----+--------+
|model_number|  desc|  fizz|buzz|price|  source|
+------------+------+------+----+-----+--------+
|     model_a|desc_a|fizz_a|null| 100.0|source_a|
|     model_b|desc_b|fizz_b|null| 200.0|source_b|
+------------+------+------+----+-----+--------+

关于java - 使用另一个数据集中的值搜索和更新 Spark 数据集列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59899454/

相关文章:

java - 使用 Geotools - JAVA 重命名另存为 TableName 的形状文件

hadoop - 如何读取 Spark 中的多行元素?

json - 使用 spark 和 Scala 读取文本文件中的 JSON

java - 不使用spark-submit.sh时,Spark如何知道Yarn资源管理器在哪里运行?

java - Spark Java中多列的聚合

java - 如何使用 JsonIter 从 json 获取特定的键值,通过使用任何可用的 getter 方法?

java - 如何使用 Java (Bing-Search-API) 从 JSON 访问嵌套元素

java - Spring Boot 2 - AJP

apache-spark - 使用数据集在 Apache Spark 中交叉加入非常慢

apache-spark - Hive 分区、Spark 分区和 Spark 中的连接 - 它们之间的关系