scala - 如何使用带有自定义 UDF 的 DataFrame.explode 将字符串拆分为子字符串?

标签 scala apache-spark apache-spark-sql

我用的是 Spark 1.5 .

我有一个数据帧 A_DF如下:

+--------------------+--------------------+
|                  id|        interactions|
+--------------------+--------------------+
|        id1         |30439831,30447866...|
|        id2         |37597858,34499875...|
|        id3         |30447866,32896718...|
|        id4         |33029476,31988037...|
|        id5         |37663606,37627579...|
|        id6         |37663606,37627579...|
|        id7         |36922232,37675077...|
|        id8         |37359529,37668820...|
|        id9         |37675077,37707778...|
+--------------------+--------------------+

哪里interactionsString .我要爆炸这是通过首先拆分 interactions string 成一组由逗号分隔的子字符串,我尝试按如下方式执行:
val splitArr = udf { (s: String) => s.split(",").map(_.trim) }

val B_DF = A_DF.explode(splitArr($"interactions"))

但我收到以下错误:

error: missing arguments for method explode in class DataFrame;
follow this method with `_' if you want to treat it as a partially applied function A_DF.explode(splitArr($"interactions"))

我不明白。所以我尝试了更复杂的东西:
val B_DF = A_DF.explode($"interactions") { case (Row(interactions: String) =>
        interactions.split(",").map(_.trim))
     }

我收到了检查警告,内容如下:

Expression of Type Array[String] does not conform to expected type TraversableOnce[A_]

有任何想法吗?

最佳答案

Dataset.explode自 Spark 2.0.0 起已弃用。除非你有理由,否则远离它。你已经被警告过了。

如果您确实有理由使用 DataFrame.explode ,见签名:

explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ⇒ TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]): DataFrame

explode[A <: Product](input: Column*)(f: (Row) ⇒ TraversableOnce[A])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[A]): DataFrame

无论哪种情况,explode使用两个参数组,因此是第一个错误。

(这是 Spark 2.1.0-SNAPSHOT )
scala> spark.version
res1: String = 2.1.0-SNAPSHOT

scala> val A_DF = Seq(("id1", "30439831,30447866")).toDF("id", "interactions")
A_DF: org.apache.spark.sql.DataFrame = [id: string, interactions: string]

scala> A_DF.explode(split($"interactions", ","))
<console>:26: error: missing argument list for method explode in class Dataset
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `explode _` or `explode(_)(_)(_)` instead of `explode`.
       A_DF.explode(split($"interactions", ","))
                   ^

您可以按如下方式操作(注意我使用 2.1.0-SNAPSHOT 时关于弃用 explode 的警告):
scala> A_DF.explode[String, String]("interactions", "parts")(_.split(",")).show
warning: there was one deprecation warning; re-run with -deprecation for details
+---+-----------------+--------+
| id|     interactions|   parts|
+---+-----------------+--------+
|id1|30439831,30447866|30439831|
|id1|30439831,30447866|30447866|
+---+-----------------+--------+

您可以使用另一个 explode如下:
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> case class Interaction(id: String, part: String)
defined class Interaction

scala> A_DF.explode[Interaction]($"id", $"interactions") { case Row(id: String, ins: String) => ins.split(",").map { it => Interaction(id, it) } }.show
warning: there was one deprecation warning; re-run with -deprecation for details
+---+-----------------+---+--------+
| id|     interactions| id|    part|
+---+-----------------+---+--------+
|id1|30439831,30447866|id1|30439831|
|id1|30439831,30447866|id1|30447866|
+---+-----------------+---+--------+

使用 explode function相反,你应该没问题,如 scaladoc 中所述(引用如下):

鉴于这已被弃用,作为替代方案,您可以使用 functions.explode() 分解列:
ds.select(explode(split('words, " ")).as("word"))

flatMap() :
ds.flatMap(_.words.split(" "))

然后你可以使用 explode功能如下:
A_DF.select($"id", explode(split('interactions, ",") as "part"))

关于scala - 如何使用带有自定义 UDF 的 DataFrame.explode 将字符串拆分为子字符串?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40931278/

相关文章:

scala - 使用嵌套字段更新数据框 - Spark

scala - Spark SQL datediff 以秒为单位

scala - 减少 Spark 中的数据框以省略空单元格

java - Apache Spark + Java : "java.lang.AssertionError: assertion failed" in ExpressionEncoder

scala - 当它们在未来被包装时,可以使用阻塞的 actor 消息吗?

scala - 升级到 ReactiveMongo 0.12.RC3 时类型不匹配

scala - 不可变对象(immutable对象)的缺点

java - NoClassDefFound : Scala/xml/metadata

java - Spark将多行转换为具有多个集合的单行

java - 如何从 Spark DataFrame 解析具有自定义 json 格式的列