apache-spark - Pyspark:在运行时动态生成when()子句的条件

标签 apache-spark pyspark apache-spark-sql

我已将 csv 文件读入 pyspark dataframe。 现在,如果我在 when() 子句中应用条件,那么当在 runtime 之前给出条件时,它可以正常工作。

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import col

sc = SparkContext('local', 'example')
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# Sample content of csv file
# col1,value
# 1,aa
# 2,bbb

s_df = sql_sc.createDataFrame(pandas_df)

new_df = s_df.withColumn('value', functions.when((col("col1") == 2) | (col("value") == "aa"), s_df.value).otherwise(
    2))

new_df.show(truncate=False)

但我需要从列表中动态形成when子句内的条件。

[{'column': 'col1', 'operator': '==', 'value': 2}, {'column': 'value', 'operator': '==', 'value': "aa"}]

有什么办法可以实现这一点吗?

提前致谢。

最佳答案

您可以执行以下操作:

  1. 动态生成 SQL 字符串,Python 3.6+ 的 f 字符串对此非常方便。
  2. 将其传递给 pyspark.sql.functions.expr 以生成 pyspark.sql.column.Column

对于您的示例,类似这样的操作应该有效:

给定 s_df 的架构:

root
 |-- col1: long (nullable = false)
 |-- value: string (nullable = false)

导入函数并实例化您的条件集合:

[...]
from pyspark.sql.functions import col, expr, when
conditions = [
    {'column': 'col1', 'operator': '==', 'value':  3}, 
    {'column': 'value', 'operator': '==', 'value': "'aa'"}
]
  • 生成整个 if 语句:
new_df = s_df.withColumn('value', expr(
    f"IF({conditions[0]['column']}{conditions[0]['operator']}{conditions[0]['value']}"
    f" OR {conditions[1]['column']}{conditions[1]['operator']}{conditions[1]['value']},"
    "value, 2)")).show()
  • 或者仅生成条件,传递给 when 函数。
new_df = s_df.withColumn('value',when(
    expr(
        f"{conditions[0]['column']}{conditions[0]['operator']}{conditions[0]['value']}"
        f" OR {conditions[1]['column']}{conditions[1]['operator']}{conditions[1]['value']}"
    ),
    col("value")).otherwise(2)).show()

关于apache-spark - Pyspark:在运行时动态生成when()子句的条件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58762655/

相关文章:

hadoop - 在集群部署模式下运行 spark 提交作业失败但通过客户端

python - Pyspark 错误与 UDF : py4j. Py4JException: 方法 __getnewargs__([]) 不存在错误

java - 如何过滤 Spark/DataFrame 上不可为空的行

java - Spark 作业在输入字符串 ea 的 java 9 NumberFormatException 上失败

python - 使用 join 时,Spark 迭代时间呈指数增长

apache-spark - Apache Spark:核心与执行者

apache-spark - 如何在没有 hive-site.xml 的情况下将 Spark SQL 连接到远程 Hive 元存储(通过节俭协议(protocol))?

scala - 无法覆盖 Spark 2.x 中 CSV 文件的架构

dataframe - Databricks - FileNotFoundException

dataframe - Pyspark 收集列表