if-statement - IF声明Pyspark

标签 if-statement apache-spark pyspark apache-spark-sql pyspark-sql

我的数据如下所示:

+----------+-------------+-------+--------------------+--------------+---+
|purch_date|  purch_class|tot_amt|       serv-provider|purch_location| id|
+----------+-------------+-------+--------------------+--------------+---+
|03/11/2017|Uncategorized| -17.53|             HOVER  |              |  0|
|02/11/2017|    Groceries| -70.05|1774 MAC'S CONVEN...|     BRAMPTON |  1|
|31/10/2017|Gasoline/Fuel|    -20|              ESSO  |              |  2|
|31/10/2017|       Travel|     -9|TORONTO PARKING A...|      TORONTO |  3|
|30/10/2017|    Groceries|  -1.84|         LONGO'S # 2|              |  4|

我正在尝试创建一个将由tot_amt列的值定义的二进制列。我想将此列添加到上面的数据中。
如果tot_amt <(-50)我希望它返回0,如果tot_amt>(-50)我希望它在新列中返回1。

到目前为止,我的尝试:

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def y(row):
    if row['tot_amt'] < (-50):
        val = 1
    else:
        val = 0
        return val

y_udf = udf(y, IntegerType())
df_7 = df_4.withColumn('Y',y_udf(df_4['tot_amt'], (df_4['purch_class'], 
(df_4['purch_date'], (df_4['serv-provider'], (df_4['purch_location'])))
display(df_7)

我收到的错误消息:

SparkException: Job aborted due to stage failure: Task 0 in stage 67.0 failed 
1 times, most recent failure: Lost task 0.0 in stage 67.0 (TID 85, localhost, 
executor driver): org.apache.spark.api.python.PythonException: Traceback (most 
recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 177, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/databricks/spark/python/pyspark/worker.py", line 104, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/databricks/spark/python/pyspark/worker.py", line 71, in <lambda>
return lambda *a: f(*a)
TypeError: y() takes exactly 1 argument (2 given)

最佳答案

如何使其工作(通过struct)

from pyspark.sql.functions import struct

df_4.withColumn("y", y_udf(
    # Include columns you want
    struct(df_4['tot_amt'], df_4['purch_class'])
))

什么更有意义
y_udf = udf(lambda y: 1 if y < -50 else 0, IntegerType())

df_4.withColumn("y", y_udf('tot_amt'))

应该如何完成:
from pyspark.sql.functions import when

df_4.withColumn("y", when(df_4['tot_amt'] < -50, 1).otherwise(0))

关于if-statement - IF声明Pyspark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47583007/

相关文章:

python - 如何在 Apache Spark 上部署并在特定时间运行 Python 脚本?

azure - 如何在 Azure HDInsight 上的 Spark 中设置 Parquet block 大小?

apache-spark - Pyspark 写入 S3 中的外部 Hive 表不是并行的

apache-spark - Zeppelin 和 SqlContext

scala - 尽管使用了 import sqlContext.implicits._,但 toDF 无法编译

hadoop - Sparksession 错误是关于配置单元的

python pandas new 列根据其他列中的条件进行分类

c - 为什么案例 : always requires constant expression while if() doesn't?

c - 如何检查文件大小是否大于确定的数字?

c++ - 你如何在 C++ 中比较多个字符串