scala - 如何动态创建列引用?

标签 scala apache-spark apache-spark-sql

我有具有以下结构的 DataFrame df:

root
 |-- author: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- client: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- outbound_link: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- url: string (nullable = true)

我运行这段代码:

val sourceField = "outbound_link" // set automatically
val targetField = "url"           // set automatically
val nodeId = "client"             // set automatically

val result = df.as("df1").join(df.as("df2"),
        $"df1."+sourceField === $"df2."+targetField
        ).groupBy(
          ($"df1."+nodeId).as("nodeId_1"),
          ($"df2."+nodeId).as("nodeId_2")
        )
        .agg(
          count("*") as "value", max($"df1."+timestampField) as "timestamp"
        )
        .toDF("source", "target", "value", "timestamp")

但我收到错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: syntax error in attribute name: df1.;

由于某种原因,变量 sourceFieldtargetFieldjoin 操作中不可见。这些变量不为空并且包含字段的名称。我必须使用变量,因为我在上一步代码中自动定义了它们。

最佳答案

确实是一个有趣的案例。查看 $"df1."+sourceField 并思考何时将 $"df1." 转换为 Column 的串联>“df1。”+sourceField

scala> val sourceField = "id"
sourceField: String = id

scala> $"df1."+sourceField
org.apache.spark.sql.AnalysisException: syntax error in attribute name: df1.;
  at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.e$1(unresolved.scala:151)
  at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.parseAttributeName(unresolved.scala:170)
  at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.quotedString(unresolved.scala:142)
  at org.apache.spark.sql.Column.<init>(Column.scala:137)
  at org.apache.spark.sql.ColumnName.<init>(Column.scala:1203)
  at org.apache.spark.sql.SQLImplicits$StringToColumn.$(SQLImplicits.scala:45)
  ... 55 elided

替换 $"df1."+sourceField 以使用 colcolumn 函数,应该没问题。

scala> col(s"df1.$sourceField")
res7: org.apache.spark.sql.Column = df1.id

关于scala - 如何动态创建列引用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50122752/

相关文章:

python - 使用udf将PySpark数据框中的纪元转换为日期时间

Scala 中的 JavaFX 找不到启动方法

java - 是否可以将 frege 与 Play 框架一起使用

scala - 如何将案例类放入 rdd 中并使其表现得像元组(对)?

scala - Spark 2 选项数据集

apache-spark - Parquet 文件是否保留 Spark DataFrames 的行顺序?

java - 如何用scala有效地替换多个字符?

scala - Alpakka卡夫卡vs卡夫卡流

python - 获取 java.lang.NoClassDefFoundError : scala/Product$class error when I am trying to use Spark lens in Pyspark Shell

java - 如何使用 java 以编程方式检索 yarn 的日志