scala - 更改 DataFrame 中嵌套列的值

标签 scala apache-spark dataframe

我有两级嵌套字段的数据框

 root
 |-- request: struct (nullable = true)
 |    |-- dummyID: string (nullable = true)
 |    |-- data: struct (nullable = true)
 |    |    |-- fooID: string (nullable = true)
 |    |    |-- barID: string (nullable = true)

我想在此处更新 fooId 列的值。我能够更新第一级的值,例如 dummyID 列在这里使用这个问题作为引用 How to add a nested column to a DataFrame

输入数据:

{
    "request": {
        "dummyID": "test_id",
        "data": {
            "fooID": "abc",
            "barID": "1485351"
        }
    }
}

输出数据:

{
    "request": {
        "dummyID": "test_id",
        "data": {
            "fooID": "def",
            "barID": "1485351"
        }
    }
}

我怎样才能使用 Scala 来做到这一点?

最佳答案

以下是此问题的通用解决方案,它可以根据递归遍历中应用的任意函数在任何级别更新任意数量的嵌套值:

def mutate(df: DataFrame, fn: Column => Column): DataFrame = {
  // Get a projection with fields mutated by `fn` and select it
  // out of the original frame with the schema reassigned to the original
  // frame (explained later)
  df.sqlContext.createDataFrame(df.select(traverse(df.schema, fn):_*).rdd, df.schema)
}

def traverse(schema: StructType, fn: Column => Column, path: String = ""): Array[Column] = {
  schema.fields.map(f => {
    f.dataType match {
      case s: StructType => struct(traverse(s, fn, path + f.name + "."): _*)
      case _ => fn(col(path + f.name))
    }
  })
}

这实际上等同于通常的“将整个结构重新定义为投影”解决方案,但它使用原始结构自动重新嵌套字段并保留可空性/元数据(当您手动重新定义结构时会丢失)。令人讨厌的是,在创建投影(afaict)时无法保留这些属性,因此上面的代码手动重新定义了架构。

一个示例应用程序:

case class Organ(name: String, count: Int)
case class Disease(id: Int, name: String, organ: Organ)
case class Drug(id: Int, name: String, alt: Array[String])

val df = Seq(
  (1, Drug(1, "drug1", Array("x", "y")), Disease(1, "disease1", Organ("heart", 2))),
  (2, Drug(2, "drug2", Array("a")), Disease(2, "disease2", Organ("eye", 3)))
).toDF("id", "drug", "disease")

df.show(false)

+---+------------------+-------------------------+
|id |drug              |disease                  |
+---+------------------+-------------------------+
|1  |[1, drug1, [x, y]]|[1, disease1, [heart, 2]]|
|2  |[2, drug2, [a]]   |[2, disease2, [eye, 3]]  |
+---+------------------+-------------------------+

// Update the integer field ("count") at the lowest level:
val df2 = mutate(df, c => if (c.toString == "disease.organ.count") c - 1 else c)
df2.show(false)

+---+------------------+-------------------------+
|id |drug              |disease                  |
+---+------------------+-------------------------+
|1  |[1, drug1, [x, y]]|[1, disease1, [heart, 1]]|
|2  |[2, drug2, [a]]   |[2, disease2, [eye, 2]]  |
+---+------------------+-------------------------+

// This will NOT necessarily be equal unless the metadata and nullability
// of all fields is preserved (as the code above does)
assertResult(df.schema.toString)(df2.schema.toString)

这样做的一个限制是它不能添加新字段,只能更新现有字段(尽管 map 可以更改为 flatMap 和返回 Array[Column] 的函数,如果您不关心保留可空性/元数据)。

此外,这里有一个更通用的 Dataset[T] 版本:

case class Record(id: Int, drug: Drug, disease: Disease)

def mutateDS[T](df: Dataset[T], fn: Column => Column)(implicit enc: Encoder[T]): Dataset[T] = {
  df.sqlContext.createDataFrame(df.select(traverse(df.schema, fn):_*).rdd, enc.schema).as[T]
}

// To call as typed dataset:
val fn: Column => Column = c => if (c.toString == "disease.organ.count") c - 1 else c
mutateDS(df.as[Record], fn).show(false)

// To call as untyped dataset:
implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) // This is necessary regardless of sparkSession.implicits._ imports
mutateDS(df, fn).show(false)

关于scala - 更改 DataFrame 中嵌套列的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50123771/

相关文章:

hadoop - 从非 hdfs 源读取时 namenode.LeaseExpiredException 而 df.write.parquet

scala - 到处导入 Spark 隐式的解决方法

mysql - 从下面给出的示例数据中,删除名称和年龄组合上的重复项并打印结果

python - 如何在 groupby 2 列之后保留 DataFrame 的原始索引?

scala - 如何定义新设置并为任务设置其值?

scala - 如何使用运行时参数在 Scala 中创建单例对象

scala - 从 DataFrame 到 RDD[LabeledPoint]

python - 如何将数据框中的周期值替换为空或其他值?

python - 我必须将 Pandas DataFrame 每一行的数据与其余行的数据进行比较,有没有办法加快计算速度?

scala - 通过HDFS返回的文件路径获取带有扩展名的文件名