我已将 csv 文件读入 pyspark dataframe
。
现在,如果我在 when()
子句中应用条件,那么当在 runtime
之前给出条件时,它可以正常工作。
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import col
sc = SparkContext('local', 'example')
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv('file.csv') # assuming the file contains a header
# Sample content of csv file
# col1,value
# 1,aa
# 2,bbb
s_df = sql_sc.createDataFrame(pandas_df)
new_df = s_df.withColumn('value', functions.when((col("col1") == 2) | (col("value") == "aa"), s_df.value).otherwise(
2))
new_df.show(truncate=False)
但我需要从列表中动态形成when子句内的条件。
[{'column': 'col1', 'operator': '==', 'value': 2}, {'column': 'value', 'operator': '==', 'value': "aa"}]
有什么办法可以实现这一点吗?
提前致谢。
最佳答案
您可以执行以下操作:
- 动态生成 SQL 字符串,Python 3.6+ 的 f 字符串对此非常方便。
- 将其传递给
pyspark.sql.functions.expr
以生成pyspark.sql.column.Column
。
对于您的示例,类似这样的操作应该有效:
给定 s_df
的架构:
root
|-- col1: long (nullable = false)
|-- value: string (nullable = false)
导入函数并实例化您的条件集合:
[...]
from pyspark.sql.functions import col, expr, when
conditions = [
{'column': 'col1', 'operator': '==', 'value': 3},
{'column': 'value', 'operator': '==', 'value': "'aa'"}
]
- 生成整个 if 语句:
new_df = s_df.withColumn('value', expr(
f"IF({conditions[0]['column']}{conditions[0]['operator']}{conditions[0]['value']}"
f" OR {conditions[1]['column']}{conditions[1]['operator']}{conditions[1]['value']},"
"value, 2)")).show()
- 或者仅生成条件,传递给
when
函数。
new_df = s_df.withColumn('value',when(
expr(
f"{conditions[0]['column']}{conditions[0]['operator']}{conditions[0]['value']}"
f" OR {conditions[1]['column']}{conditions[1]['operator']}{conditions[1]['value']}"
),
col("value")).otherwise(2)).show()
关于apache-spark - Pyspark:在运行时动态生成when()子句的条件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58762655/