我有具有以下结构的 DataFrame df
:
root
|-- author: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- client: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- outbound_link: array (nullable = true)
| |-- element: string (containsNull = true)
|-- url: string (nullable = true)
我运行这段代码:
val sourceField = "outbound_link" // set automatically
val targetField = "url" // set automatically
val nodeId = "client" // set automatically
val result = df.as("df1").join(df.as("df2"),
$"df1."+sourceField === $"df2."+targetField
).groupBy(
($"df1."+nodeId).as("nodeId_1"),
($"df2."+nodeId).as("nodeId_2")
)
.agg(
count("*") as "value", max($"df1."+timestampField) as "timestamp"
)
.toDF("source", "target", "value", "timestamp")
但我收到错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: syntax error in attribute name: df1.;
由于某种原因,变量 sourceField
和 targetField
在 join
操作中不可见。这些变量不为空并且包含字段的名称。我必须使用变量,因为我在上一步代码中自动定义了它们。
最佳答案
确实是一个有趣的案例。查看 $"df1."+sourceField
并思考何时将 $"df1."
转换为 Column
与 的串联>“df1。”+sourceField
。
scala> val sourceField = "id"
sourceField: String = id
scala> $"df1."+sourceField
org.apache.spark.sql.AnalysisException: syntax error in attribute name: df1.;
at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.e$1(unresolved.scala:151)
at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.parseAttributeName(unresolved.scala:170)
at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.quotedString(unresolved.scala:142)
at org.apache.spark.sql.Column.<init>(Column.scala:137)
at org.apache.spark.sql.ColumnName.<init>(Column.scala:1203)
at org.apache.spark.sql.SQLImplicits$StringToColumn.$(SQLImplicits.scala:45)
... 55 elided
替换 $"df1."+sourceField
以使用 col
或 column
函数,应该没问题。
scala> col(s"df1.$sourceField")
res7: org.apache.spark.sql.Column = df1.id
关于scala - 如何动态创建列引用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50122752/