apache-spark - 如何在udf中使用广播集合?

标签 apache-spark apache-spark-sql

如何在 Spark SQL 1.6.1 udf 中使用广播集合。应从主 SQL 调用 Udf,如下所示

sqlContext.sql("""Select col1,col2,udf_1(key) as value_from_udf FROM table_a""")

udf_1() 应该通过广播小集合查找以将值返回到主 sql。

最佳答案

这是 pySpark 中的一个最小的可重现示例,说明了如何使用广播变量来执行查找,并在内部使用 lambda 函数作为 UDF SQL 语句。

# Create dummy data and register as table
df = sc.parallelize([
    (1,"a"),
    (2,"b"),
    (3,"c")]).toDF(["num","let"])
df.registerTempTable('table')

# Create broadcast variable from local dictionary
myDict = {1: "y", 2: "x", 3: "z"}
broadcastVar = sc.broadcast(myDict) 
# Alternatively, if your dict is a key-value rdd, 
# you can do sc.broadcast(rddDict.collectAsMap())

# Create lookup function and apply it
sqlContext.registerFunction("lookup", lambda x: broadcastVar.value.get(x))
sqlContext.sql('select num, let, lookup(num) as test from table').show()
+---+---+----+
|num|let|test|
+---+---+----+
|  1|  a|   y|
|  2|  b|   x|
|  3|  c|   z|
+---+---+----+

关于apache-spark - 如何在udf中使用广播集合?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40673773/

相关文章:

python - 如何从python复制pyspark/hadoop中的文件

scala - Spark2.1.0 不兼容 Jackson 版本 2.7.6

apache-spark - Pyspark 按顺序将多个 csv 文件读取到数据框中

apache-spark - 什么是 Databricks Spark Delta 表?他们是否还存储特定 session 的数据以及如何查看这些增量表及其结构

Pandas 数据框到 RDD

apache-spark - Spark 失败,因为 S3 文件已更新。如何消除这个错误呢?

apache-spark - 出现OutofMemoryError-GC开销限制超出pyspark中的限制

python - 如何在 PySpark 的 UDF 中返回 "Tuple type"?

scala - 为什么在reduce中使用减法结果不一致?

python - pyspark when/otherwise 子句在使用 udf 时失败