apache-spark - pyspark替换列值

标签 apache-spark apache-spark-sql pyspark

我们有以下示例数据框

+-----------+---------------+--------------+
|customer_id|age             |post_code    |
+-----------+---------------+--------------+
|       1001|              50|   BS32 0HW  |
+-----------+---------------+--------------+

然后我们得到一个像这样的字符串

useful_info = 'Customer [customer_id] is [age] years old and lives at [post_code].'

这是示例字符串之一,它可以是其中包含列名称的任何字符串。我只需要将这些列名称替换为实际值。

现在我需要添加 useful_info 列,但替换为列值,即 预期的数据框为:

[Row(customer_id='1001', age=50, post_code='BS32 0HW', useful_info='Customer 1001 is 50 years old and lives at BS32 0HW.')]

有人知道怎么做吗?

最佳答案

这是使用 regexp_replace 的一种方法功能。您可以将要替换的列放在 useful_info 字符串列中 并构建一个如下所示的表达式列:

df = spark.createDataFrame([(1001, 50, "BS32 0HW")], ["customer_id", "age", "post_code"])

list_columns_replace = ["customer_id", "age", "post_code"]

# replace first column in the string
to_replace = f"\\\\[{list_columns_replace[0]}\\\\]"
replace_expr = f"regexp_replace(useful_info, '{to_replace}', {list_columns_replace[0]})"

# loop through other columns to replace and update replacement expression
for c in list_columns_replace[1:]:
    to_replace = f"\\\\[{c}\\\\]"
    replace_expr = f"regexp_replace({replace_expr}, '{to_replace}', {c})"

# add new column 
df.withColumn("useful_info", lit("Customer [customer_id] is [age] years old and lives at [post_code].")) \
  .withColumn("useful_info", expr(replace_expr)) \
  .show(1, False)

#+-----------+---+---------+----------------------------------------------------+
#|customer_id|age|post_code|useful_info                                         |
#+-----------+---+---------+----------------------------------------------------+
#|1001       |50 |BS32 0HW |Customer 1001 is 50 years old and lives at BS32 0HW.|
#+-----------+---+---------+----------------------------------------------------+

关于apache-spark - pyspark替换列值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60179484/

相关文章:

sql - 使用 Scala 中的数据帧在 Spark 1.30 中保存为文本

pyspark - 如何在 Pyspark 中将列表拆分为多列?

postgresql - Spark Dataframe 到 Postgres 使用复制命令 -pyspark

python - 使用 Python 的 Apache Spark TFIDF

apache-spark - 无法实例化提供程序 org.apache.spark.sql.avro.AvroFileFormat

python - 在本地测试 Hive + spark python 程序?

python - 结合 Spark Streaming + MLlib

python - 将时间序列pySpark数据帧拆分为测试和训练,而无需使用随机拆分

sql - 在spark中找到两个表之间最接近的时间

dataframe - Spark : How to aggregate/reduce records based on time difference?