sql - Scala/Apache Spark 转换 DataFrame 列值和类型,否则为多个

标签 sql scala apache-spark cassandra

我有一个主 SQL 表,正在将其读入 Spark 并修改以写入 CassandraDB。目前,我有一个将性别从 0、1、2、3(整数)转换为“男”、“女”、“跨性别”等(字符串)的工作实现。虽然下面的方法确实有效,但将这些映射创建一个单独的数组到 DataFrame 中,将其连接到主表/DataFrame 中,然后删除、重命名等似乎非常低效。

我见过:

.withColumn("gender", when(col("gender) === 1, "male").otherwise("female") 

这将允许我继续在主表上进行方法链接,但无法让它使用两个以上的选项。有没有办法做到这一点?我在此表上有大约 10 个不同的列,每个列都需要创建自己的自定义转换。由于此代码将处理 TB 级的数据,是否有一种重复性更少且更有效的方法来完成此任务。感谢您提前提供的任何帮助!

case class Gender(tmpid: Int, tmpgender: String)

private def createGenderDf(spark:SparkSession): DataFrame = {
  import spark.implicits._
  Seq(
    Gender(1, "Male"),
    Gender(2, "Female"),
    Gender(777, "Prefer not to answer")
  ).toDF
}


private def createPersonsDf(spark: SparkSession): DataFrame = {
  val genderDf = createGenderDf(spark)
  genderDf.show()

  val personsDf: DataFrame = spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "\t")
    .load(dataPath + "people.csv")
    .withColumnRenamed("ID", "id")
    .withColumnRenamed("name_first", "firstname")

  val personsDf1: DataFrame = personsDf
    .join(genderDf, personsDf("gender") === genderDf("tmpid"), "leftouter")

  val personsDf2: DataFrame = personsDf1
    .drop("gender")
    .drop("tmpid")
    .withColumnRenamed("tmpgender", "gender")
}

最佳答案

您可以使用嵌套的 when 函数,这将消除您创建 genderDfjoindrop重命名等。对于您的示例,您可以执行以下操作

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
personsDf.withColumn("gender", when(col("gender") === 1, "male").otherwise(when(col("gender") ===2, "female").otherwise("Prefer not to answer")).cast(StringType))

您可以在上述嵌套结构中添加更多 when 函数,并且也可以对其他 10 列重复相同的操作。

关于sql - Scala/Apache Spark 转换 DataFrame 列值和类型,否则为多个,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46165835/

相关文章:

mysql - 在 7000 条记录上 13 秒后完成左连接 - mysql

php - 如何使用 jointable 进行 dql 查询?

mysql - 获取日期范围内的最大数据而不重复MYSQL

scala - 为什么 Scala 允许嵌套数据结构,如 List 或 Array

hadoop - Spark 不会在 yarn-cluster 模式下运行 final `saveAsNewAPIHadoopFile` 方法

mysql - 删除 MySQL 中部分相似的行

scala - 断言 RDD 未排序

scala - 类是抽象的;无法在凿子上实例化错误

r - 如何将函数应用于 SparkR 中的每一行?

scala - Spark Scala 总结数据集的列元素?