scala - 将 Spark DataFrame 模式转换为新模式

标签 scala apache-spark dataframe

我有多个从不同来源读取的 spark 作业,它们具有不同的架构,但它们非常接近,我想要做的是将它们全部写入同一个 Redshift 表,因此我需要统一所有 DataFrame 架构,什么是最好的方法吗?

假设第一个输入数据的架构如下:

  val schema1 = StructType(Seq(
    StructField("date", DateType),
    StructField("campaign_id", StringType),
    StructField("campaign_name", StringType),
    StructField("platform", StringType),
    StructField("country", StringType),
    StructField("views", DoubleType),
    StructField("installs", DoubleType),
    StructField("spend", DoubleType)
  ))

seconf inout 源的架构如下:
  val schema2 = StructType(Seq(
    StructField("date", DateType),
    StructField("creator_id", StringType),
    StructField("creator_name", StringType),
    StructField("platform", StringType),
    StructField("views", DoubleType),
    StructField("installs", DoubleType),
    StructField("spend", DoubleType),
    StructField("ecpm", DoubleType)
  ))

表架构(预期统一数据帧):
  val finalSchema = StructType(Seq(
    StructField("date", DateType),
    StructField("account_name", StringType),
    StructField("adset_id", StringType),
    StructField("adset_name", StringType),
    StructField("campaign_id", StringType),
    StructField("campaign_name", StringType),
    StructField("pub_id", StringType),
    StructField("pub_name", StringType),
    StructField("creative_id", StringType),
    StructField("creative_name", StringType),
    StructField("platform", StringType),
    StructField("install_source", StringType),
    StructField("views", IntegerType),
    StructField("clicks", IntegerType),
    StructField("installs", IntegerType),
    StructField("cost", DoubleType)
  ))

正如您在最终模式中看到的那样,我有一些列可能不在输入模式中,因此它应该为空,一些列名称也应该重命名。还有一些列如 ecpm应该掉线。

最佳答案

添加 index columnsdataframesjoin它们基于 index所以会有一对一的映射。之后 select只有您想要的 columns来自 joined dataframe .

  • 如果您有两个 dataframes像下面
    // df1.show
    +-----+---+
    | name|age|
    +-----+---+
    |Alice| 25|
    |  Bob| 29|
    |  Tom| 26|
    +-----+---+
    
    //df2.show
    +--------+-------+
    |    city|country|
    +--------+-------+
    |   Delhi|  India|
    |New York|    USA|
    |  London|     UK|
    +--------+-------+
    
  • 现在添加 index columns并获得一对一映射
    import org.apache.spark.sql.functions._
    
    val df1Index=df1.withColumn("index1",monotonicallyIncreasingId)
    
    val df2Index=df2.withColumn("index2",monotonicallyIncreasingId)
    
    val joinedDf=df1Index.join(df2Index,df1Index("index1")===df2Index("index2"))
    
    //joinedDf
    
    +-----+---+------+--------+-------+------+
    | name|age|index1|    city|country|index2|
    +-----+---+------+--------+-------+------+
    |Alice| 25|     0|   Delhi|  India|     0|
    |  Bob| 29|     1|New York|    USA|     1|
    |  Tom| 26|     2|  London|     UK|     2|
    +-----+---+------+--------+-------+------+
    

  • 现在您可以编写如下查询
    val queryList=List(col("name"),col("age"),col("country"))
    joinedDf.select(queryList:_*).show
    
    //Output df
    +-----+---+-------+
    | name|age|country|
    +-----+---+-------+
    |Alice| 25|  India|
    |  Bob| 29|    USA|
    |  Tom| 26|     UK|
    +-----+---+-------+
    

    关于scala - 将 Spark DataFrame 模式转换为新模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51638572/

    相关文章:

    scala - 循环遍历 Map Spark Scala

    Scala Currying 和函数字面量

    python - pd.merge_asof 每个时间段有多个匹配项?

    java - Java 的 ArrayList/Scala 的 ArrayBuffer 可能有改进吗?

    scala - 如何在Scala中将对象列表转换为两个字段的映射

    apache-spark - 这个简单的例子是Spark还是Hadoop?

    azure - hdinsight actionscript 安装 spark 1.2

    hadoop - 我正在CDH5.4上使用Hbase 1.0.0和Apache phoenix 4.3.0。当我重新启动Hbase regionserver时关闭

    R 日期范围数据帧到每小时总持续时间

    python - Pyspark 数据框连接有少量重复的列名和少量没有重复的列