scala - 在scala中将列从一个数据帧添加到另一个数据帧

标签 scala apache-spark dataframe

这个问题在这里已经有了答案:





Apache Spark how to append new column from list/array to Spark dataframe

(2 个回答)



Append a column to Data Frame in Apache Spark 1.3

(4 个回答)


4年前关闭。




我有两个行数相同的 DataFrame,但列数根据来源是不同的和动态的。

第一个 DataFrame 包含所有列,但第二个 DataFrame 被过滤和处理,没有其他所有列。

需要从第一个 DataFrame 中选择特定列并与第二个 DataFrame 添加/合并。

val sourceDf = spark.read.load(parquetFilePath)
val resultDf = spark.read.load(resultFilePath)

val columnName :String="Col1"

我尝试了几种添加方式,这里我只给出几个....
val modifiedResult = resultDf.withColumn(columnName, sourceDf.col(columnName))

val modifiedResult = resultDf.withColumn(columnName, sourceDf(columnName))
val modifiedResult = resultDf.withColumn(columnName, labelColumnUdf(sourceDf.col(columnName)))

这些都不起作用。

你能帮我把列从第一个数据帧合并/添加到第二个数据帧吗?

给定的示例不是我需要的确切数据结构,但它将满足我解决此问题的要求。

样本输入输出:
Source DataFrame:
+---+------+---+
|InputGas|
+---+------+---+
|1000|
|2000|
|3000|
|4000|
+---+------+---+

Result DataFrame:
+---+------+---+
| Time|CalcGas|Speed|
+---+------+---+
|  0 | 111| 1111|
|  0 | 222| 2222|
|  1 | 333| 3333|
|  2 | 444| 4444|
+---+------+---+

Expected Output:
+---+------+---+
|Time|CalcGas|Speed|InputGas|
+---+------+---+---+
|  0|111 | 1111 |1000|
|  0|222 | 2222 |2000|
|  1|333 | 3333 |3000|
|  2|444 | 4444 |4000|
+---+------+---+---+

最佳答案

一种使用 join 实现此目的的方法

如果您在两个数据框中都有一些共同的列,那么您可以对该列执行连接并获得您想要的结果。

示例:

import sparkSession.sqlContext.implicits._

val df1 = Seq((1, "Anu"),(2, "Suresh"),(3, "Usha"), (4, "Nisha")).toDF("id","name")
val df2 = Seq((1, 23),(2, 24),(3, 24), (4, 25), (5, 30), (6, 32)).toDF("id","age")

val df = df1.as("df1").join(df2.as("df2"), df1("id") === df2("id")).select("df1.id", "df1.name", "df2.age")
df.show()

输出:
+---+------+---+
| id|  name|age|
+---+------+---+
|  1|   Anu| 23|
|  2|Suresh| 24|
|  3|  Usha| 24|
|  4| Nisha| 25|
+---+------+---+

更新:

如果您在两个数据帧中都没有任何共同的唯一 ID,则创建一个并使用它。
import sparkSession.sqlContext.implicits._
import org.apache.spark.sql.functions._

var sourceDf = Seq(1000, 2000, 3000, 4000).toDF("InputGas")
var resultDf  = Seq((0, 111, 1111), (0, 222, 2222), (1, 333, 3333), (2, 444, 4444)).toDF("Time", "CalcGas", "Speed")

sourceDf = sourceDf.withColumn("rowId1", monotonically_increasing_id())
resultDf = resultDf.withColumn("rowId2", monotonically_increasing_id())

val df = sourceDf.as("df1").join(resultDf.as("df2"), sourceDf("rowId1") === resultDf("rowId2"), "inner").select("df1.InputGas", "df2.Time", "df2.CalcGas", "df2.Speed")
df.show()

输出:
+--------+----+-------+-----+
|InputGas|Time|CalcGas|Speed|
+--------+----+-------+-----+
|    1000|   0|    111| 1111|
|    2000|   0|    222| 2222|
|    3000|   1|    333| 3333|
|    4000|   2|    444| 4444|
+--------+----+-------+-----+

关于scala - 在scala中将列从一个数据帧添加到另一个数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47028442/

相关文章:

scala - 将 RDD[String] 转换为 RDD[Row] 到 Dataframe Spark Scala

scala - 为什么 scala 编译器有时会在 "pattern matching"上发出警告,有时却不会?

scala - Spark提交期间如何解决DB2 java.io.CharConversionException

apache-spark - Spark : Is there differences between agg function and a window function on a spark dataframe?

python - 检查给定列表中的元素是否存在于 DataFrame 的数组列中

bash - 使用 scala.sys.process 执行 bash 字符串

hadoop - 如何在Hortonworks Edge Node中安装最新版本的Apache Spark

python - 使用掩码计算DataFrame中的平均值

python:指定观察周围的窗口

python - 从不同的大文件中打乱数据的有效方法