scala - Spark udf 初始化

标签 scala apache-spark apache-spark-sql user-defined-functions

我想在 Spark SQL 中创建一个自定义的基于正则表达式的 UDF。我的偏好是创建一个驻留内存

 Map[String,Pattern]

其中 Pattern 是指字符串键的编译正则表达式版本。但要做到这一点,我们需要将 map 创建放入 UDF 的“初始化”函数中。

那么 Spark udf 是否有任何结构支持跨调用的持久状态(通过 Spark SQL)?

请注意,HIVE 确实支持 UDF 的生命周期。我用它来生成解析树作为初始化的一部分,这样 UDF 的实际调用是针对闪电般快速的树,不涉及解析。

最佳答案

让我们从导入和一些虚拟数据开始:

import org.apache.spark.sql.functions.udf
import scala.util.matching.Regex
import java.util.regex.Pattern

val df = sc.parallelize(Seq(
  ("foo", "this is bar"), ("foo", "this is foo"),
  ("bar", "foobar"), ("bar", "foo and foo")
)).toDF("type", "value")

和 map :

val patterns: Map[String, Pattern] = Seq(("foo", ".*foo.*"), ("bar", ".*bar.*"))
   .map{case (k, v) => (k, new Regex(v).pattern)}
   .toMap

现在我看到两个不同的选项:
  • 使 patterns 成为 udf 中引用的广播变量

    val patternsBd = sc.broadcast(patterns)
    
    val typeMatchedViaBroadcast = udf((t: String, v: String) =>
      patternsBd.value.get(t).map(m => m.matcher(v).matches))
    
    df.withColumn("match", typeMatchedViaBroadcast($"type", $"value")).show
    
    // +----+-----------+-----+
    // |type|      value|match|
    // +----+-----------+-----+
    // | foo|this is bar|false|
    // | foo|this is foo| true|
    // | bar|     foobar| true|
    // | bar|foo and foo|false|
    // +----+-----------+-----+
    
  • 在闭包内传递 map

    def makeTypeMatchedViaClosure(patterns: Map[String, Pattern]) = udf(
      (t: String, v: String) => patterns.get(t).map(m => m.matcher(v).matches))
    
    val typeMatchedViaClosure = makeTypeMatchedViaClosure(patterns)
    
    df.withColumn("match", typeMatchedViaClosure($"type", $"value")).show
    
    // +----+-----------+-----+
    // |type|      value|match|
    // +----+-----------+-----+
    // | foo|this is bar|false|
    // | foo|this is foo| true|
    // | bar|     foobar| true|
    // | bar|foo and foo|false|
    // +----+-----------+-----+
    
  • 关于scala - Spark udf 初始化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33664991/

    相关文章:

    apache-spark - 带有 Spark 2.1 结构化流的 Kafka - 无法反序列化

    scala - 使用Spark通过s3a将 Parquet 文件写入s3非常慢

    python - 删除 Spark 数据框中包含句点的列名称

    scala - 更改 Spark 中的日期格式会返回不正确的结果

    java - 无法在泛型类中将 Double 转换为 Float

    scala - 如何使用 sbt-assembly 插件强制将提供的依赖项添加到 fat jar?

    java - IntelliJ : scala: javac: invalid source release: 1. 7

    scala - 用于处理接收中的异步操作的 Akka 模式

    apache-spark - 我可以将 pyspark 数据框另存为哪些文件格式?

    斯卡拉 Spark UDF ClassCastException : WrappedArray$ofRef cannot be cast to [Lscala. Tuple2