arrays - PySpark:替换 ArrayType(String) 中的值

标签 arrays apache-spark dictionary pyspark replace

我目前有以下代码:

def _join_intent_types(df):
  mappings = {
    'PastNews': 'ContextualInformation',
    'ContinuingNews': 'News',
    'KnownAlready': 'OriginalEvent',
    'SignificantEventChange': 'NewSubEvent',
  }
  return df.withColumn('Categories', posexplode('Categories').alias('i', 'val'))\
           .when(col('val').isin(mappings), mappings[col('i')])\
           .otherwise(col('val'))

但我不确定我的语法是否正确。我想做的是对一列列表进行操作,例如:

['EmergingThreats', 'Factoid', 'KnownAlready']

并用提供的字典中的映射替换该数组中的字符串,即

['EmergingThreats', 'Factoid', 'OriginalEvent']

我知道这可以通过 UDF 实现,但我担心这会对性能和可伸缩性产生怎样的影响。

原始表格的示例:

+------------------+-----------------------------------------------------------+
|postID            |Categories                                                 |
+------------------+-----------------------------------------------------------+
|266269932671606786|[EmergingThreats, Factoid, KnownAlready]                   |
|266804609954234369|[Donations, ServiceAvailable, ContinuingNews]              |
|266250638852243457|[EmergingThreats, Factoid, ContinuingNews]                 |
|266381928989589505|[EmergingThreats, MultimediaShare, Factoid, ContinuingNews]|
|266223346520297472|[EmergingThreats, Factoid, KnownAlready]                   |
+------------------+-----------------------------------------------------------+

我希望代码用新映射替换这些数组中的字符串,前提是它们存在于字典中。如果没有,请保持原样:

+------------------+-------------------------------------------------+          
|postID            |Categories                                       |
+------------------+-------------------------------------------------+
|266269932671606786|[EmergingThreats, Factoid, OriginalEvent]        |
|266804609954234369|[Donations, ServiceAvailable, News]              |
|266250638852243457|[EmergingThreats, Factoid, News]                 |
|266381928989589505|[EmergingThreats, MultimediaShare, Factoid, News]|
|266223346520297472|[EmergingThreats, Factoid, OriginalEvent]        |
+------------------+-------------------------------------------------+

最佳答案

使用 explode + collect_list is expensive .这是未经测试的,但应该适用于 Spark 2.4+:

from pyspark.sql.functions import expr

for k, v in mappings.items()
    df = df.withColumn(
        'Categories', 
        expr('transform(sequence(0,size(Categories)-1), x -> replace(Categories[x], {k}, {v}))'.format(k=k, v=v))
    )

您还可以将映射转换为 CASE/WHEN 语句,然后将其应用于 SparkSQL 转换函数:

sql_epxr = "transform(Categories, x -> CASE x {} ELSE x END)".format(" ".join("WHEN '{}' THEN '{}'".format(k,v) for k,v in mappings.items()))
# this yields the following SQL expression:
# transform(Categories, x -> 
#   CASE x 
#     WHEN 'PastNews' THEN 'ContextualInformation' 
#     WHEN 'ContinuingNews' THEN 'News' 
#     WHEN 'KnownAlready' THEN 'OriginalEvent' 
#     WHEN 'SignificantEventChange' THEN 'NewSubEvent' 
#     ELSE x 
#   END
# )

df.withColumn('Categories', expr(sql_epxr)).show(truncate=False)    

对于旧版本的 spark,udf 可能是首选。

关于arrays - PySpark:替换 ArrayType(String) 中的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61268325/

相关文章:

arrays - Ruby 遍历数组获取方法名

java - 一旦计数器达到某个值,就递增数组

PHP 数组索引与数组关联

scala - Spark 嵌套转换 SPARK-5063

apache-spark - kafka max.poll.records 在 Spark 流中不起作用

scala - 处理一行后的 Spark - Scala : Return multiple <key, 值>

python - 计算字典中项目的实例(Pandas)

C++ Map<Vertex*,Position> 并使用 MAP.FIND 与真实顶点比较

node.js - 如何在 MongoDB 中 findOne 之后仅显示数组的一部分?

Python - 在并行字典中查找一个值的平均值