scala - 函数到 Spark Dataframe 的每一行

标签 scala apache-spark dataframe apache-spark-sql

我有一个带有 2 列(Report_idCluster_number)的 spark Dataframe (df)。

我想将一个函数 (getClusterInfo) 应用于 df,它将返回每个集群的名称,即如果集群编号为“3”,那么对于特定的 report_id,将写入下面提到的 3 行:

{"cluster_id":"1","influencers":[{"screenName":"A"},{"screenName":"B"},{"screenName":"C"},...]}
{"cluster_id":"2","influencers":[{"screenName":"D"},{"screenName":"E"},{"screenName":"F"},...]}
{"cluster_id":"3","influencers":[{"screenName":"G"},{"screenName":"H"},{"screenName":"E"},...]}

我在 df 上使用 foreach 来应用 getClusterInfo 函数,但不知道如何将 o/p 转换为 Dataframe (Report_id, Array[cluster_info]).

这里是代码片段:

  df.foreach(row => {
    val report_id = row(0)
    val cluster_no = row(1).toString
    val cluster_numbers = new Range(0, cluster_no.toInt - 1, 1)
    for (cluster <- cluster_numbers.by(1)) {
      val cluster_id = report_id + "_" + cluster
      //get cluster influencers
      val result = getClusterInfo(cluster_id)
      println(result.get)
      val res : String = result.get.toString()
      // TODO ?
    }
    .. //TODO ?
  })

最佳答案

一般来说,当您想要将某些内容映射到其他内容时,您不应该使用foreachforeach 适用于应用只有副作用 且不返回任何内容的函数。

在这种情况下,如果我的细节正确(可能不是),您可以使用用户定义函数 (UDF) 并展开结果:

import org.apache.spark.sql.functions._
import spark.implicits._

// I'm assuming we have these case classes (or similar)
case class Influencer(screenName: String)
case class ClusterInfo(cluster_id: String, influencers: Array[Influencer])

// I'm assuming this method is supplied - with your own implementation
def getClusterInfo(clusterId: String): ClusterInfo =
  ClusterInfo(clusterId, Array(Influencer(clusterId)))

// some sample data - assuming both columns are integers:
val df = Seq((222, 3), (333, 4)).toDF("Report_id", "Cluster_number")

// actual solution:

// UDF that returns an array of ClusterInfo;
// Array size is 'clusterNo', creates cluster id for each element and maps it to info
val clusterInfoUdf = udf { (clusterNo: Int, reportId: Int) =>
  (1 to clusterNo).map(v => s"${reportId}_$v").map(getClusterInfo)
}

// apply UDF to each record and explode - to create one record per array item
val result = df.select(explode(clusterInfoUdf($"Cluster_number", $"Report_id")))

result.printSchema()
// root
// |-- col: struct (nullable = true)
// |    |-- cluster_id: string (nullable = true)
// |    |-- influencers: array (nullable = true)
// |    |    |-- element: struct (containsNull = true)
// |    |    |    |-- screenName: string (nullable = true)

result.show(truncate = false)
// +-----------------------------+
// |col                          |
// +-----------------------------+
// |[222_1,WrappedArray([222_1])]|
// |[222_2,WrappedArray([222_2])]|
// |[222_3,WrappedArray([222_3])]|
// |[333_1,WrappedArray([333_1])]|
// |[333_2,WrappedArray([333_2])]|
// |[333_3,WrappedArray([333_3])]|
// |[333_4,WrappedArray([333_4])]|
// +-----------------------------+

关于scala - 函数到 Spark Dataframe 的每一行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49329406/

相关文章:

scala - 从Apache Spark填充Elasticsearch日期

apache-spark - java.lang.ClassNotFoundException : org. apache.hadoop.util.StopWatch

python - 按列标题排列 DataFrame 列

scala - Spark : Applying UDF to Dataframe Generating new Columns based on Values in DF

java - 将java序列化对象反序列化为Scala类

java - 基于Java DataFrame去除重复行

r - 来自数据框中的行值的平均值,不包括 R 中的最小值和最大值

scala - 使用 json4s 反序列化嵌套多态 json 字段

postgresql - 如何配置使用 Postgresql Play JDBC?

python - 从 pandas 数据框中删除值分布极其不均匀的列