我目前有以下代码:
def _join_intent_types(df):
mappings = {
'PastNews': 'ContextualInformation',
'ContinuingNews': 'News',
'KnownAlready': 'OriginalEvent',
'SignificantEventChange': 'NewSubEvent',
}
return df.withColumn('Categories', posexplode('Categories').alias('i', 'val'))\
.when(col('val').isin(mappings), mappings[col('i')])\
.otherwise(col('val'))
但我不确定我的语法是否正确。我想做的是对一列列表进行操作,例如:
['EmergingThreats', 'Factoid', 'KnownAlready']
并用提供的字典中的映射替换该数组中的字符串,即
['EmergingThreats', 'Factoid', 'OriginalEvent']
我知道这可以通过 UDF 实现,但我担心这会对性能和可伸缩性产生怎样的影响。
原始表格的示例:
+------------------+-----------------------------------------------------------+
|postID |Categories |
+------------------+-----------------------------------------------------------+
|266269932671606786|[EmergingThreats, Factoid, KnownAlready] |
|266804609954234369|[Donations, ServiceAvailable, ContinuingNews] |
|266250638852243457|[EmergingThreats, Factoid, ContinuingNews] |
|266381928989589505|[EmergingThreats, MultimediaShare, Factoid, ContinuingNews]|
|266223346520297472|[EmergingThreats, Factoid, KnownAlready] |
+------------------+-----------------------------------------------------------+
我希望代码用新映射替换这些数组中的字符串,前提是它们存在于字典中。如果没有,请保持原样:
+------------------+-------------------------------------------------+
|postID |Categories |
+------------------+-------------------------------------------------+
|266269932671606786|[EmergingThreats, Factoid, OriginalEvent] |
|266804609954234369|[Donations, ServiceAvailable, News] |
|266250638852243457|[EmergingThreats, Factoid, News] |
|266381928989589505|[EmergingThreats, MultimediaShare, Factoid, News]|
|266223346520297472|[EmergingThreats, Factoid, OriginalEvent] |
+------------------+-------------------------------------------------+
最佳答案
使用 explode
+ collect_list
is expensive .这是未经测试的,但应该适用于 Spark 2.4+:
from pyspark.sql.functions import expr
for k, v in mappings.items()
df = df.withColumn(
'Categories',
expr('transform(sequence(0,size(Categories)-1), x -> replace(Categories[x], {k}, {v}))'.format(k=k, v=v))
)
您还可以将映射转换为 CASE/WHEN 语句,然后将其应用于 SparkSQL 转换函数:
sql_epxr = "transform(Categories, x -> CASE x {} ELSE x END)".format(" ".join("WHEN '{}' THEN '{}'".format(k,v) for k,v in mappings.items()))
# this yields the following SQL expression:
# transform(Categories, x ->
# CASE x
# WHEN 'PastNews' THEN 'ContextualInformation'
# WHEN 'ContinuingNews' THEN 'News'
# WHEN 'KnownAlready' THEN 'OriginalEvent'
# WHEN 'SignificantEventChange' THEN 'NewSubEvent'
# ELSE x
# END
# )
df.withColumn('Categories', expr(sql_epxr)).show(truncate=False)
对于旧版本的 spark,udf
可能是首选。
关于arrays - PySpark:替换 ArrayType(String) 中的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61268325/