python - PySPark - 确定操作后数据类型的函数

标签 python apache-spark pyspark

在 PySpark 中使用 udf 时,我们必须在创建 udf 时声明操作的返回类型。

现在考虑一个场景,我有两列,我将它们添加以获得第三列。我使用以下内容将它们添加起来

>>> udf_add  = udf(lambda x: x[0]+x[1], IntegerType())
>>> spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B']).withColumn('Result', udf_add(array('A', 'B'))).show()
+---+---+---+------+
| ID|  A|  B|Result|
+---+---+---+------+
|101|  1| 16|    17|
+---+---+---+------+

现在假设其中一列是 float ,并且我执行相同的操作

>>> spark.createDataFrame([(101, 1, 16.1)], ['ID', 'A', 'B']).withColumn('Result', udf_add(array('A', 'B'))).show()

+---+---+----+------+
| ID|  A|   B|Result|
+---+---+----+------+
|101|  1|16.1|  null|
+---+---+----+------+

在这种情况下,我得到一个 null,因为我的结果实际上是一个 float ,但我曾向 udf 提到它会是一个 float 。为了克服这个问题,我将 udf 更改为 FloatType 以考虑所有情况

>>> udf_add = udf(lambda x: x[0] + x[1], FloatType())

但是当我向它传递整数值时,它返回 null 值。

>>> spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B']).withColumn('Result', udf_add(array('A', 'B'))).show()

+---+---+---+------+
| ID|  A|  B|Result|
+---+---+---+------+
|101|  1| 16|  null|
+---+---+---+------+

所以问题是 - pyspark 中是否有一种数据类型包含整数和 float 并且可以处理上述两种情况?

如果没有,有没有办法预先确定或不定义数据类型?

我问这个问题的原因是因为我有多个数据集,并且我想在它们之间执行相同的一组操作。这些值可以是整数或 float 。

最佳答案

is there a data type in pyspark that is inclusive of integer and floats and can handle both the cases above?

没有。如果您希望代码尽可能通用,则将输出转换为可以容纳结果的最通用类型:

 udf(lambda x: float(x[0] + x[1]), DoubleType()) 

或者通过强制转换输入列来确保在调用时始终使用正确的类型。

 udf(lambda x: x[0] + x[1], DoubleType()) 
 ...
 udf_add(array('A', 'B').cast("array<double>")  

显然,您永远不会在生产代码中使用 udf 进行简单的添加。只需在 Column 对象上使用 __add__ (+) 即可。仅当无法提供更有效的解决方案时,我们才使用 udf

If not, is there a way to determine or not define the data type before hand?

没有。返回类型必须事先已知,而 udf 是一个黑匣子 - Spark 无法判断类型应该是什么。

关于python - PySPark - 确定操作后数据类型的函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49815411/

相关文章:

python - 如何在discord.py中获取 channel 的最新消息?

Python 在 PyQt5 应用程序中因 2 个工作线程而崩溃

python - 在 Python 中排列对于 RAM 来说太大的列表

apache-spark - 为什么读取 parquet 文件时会出现 "is not a Parquet file"错误

apache-spark - 在pyspark lambda映射函数中使用keras模型

python - 在数据框中显示 NER Spacy 数据

apache-spark - 使用 AWS Glue 作业在 Redshift 中导入数据时添加时间戳列

python - Spark mllib 预测奇怪的数字或 NaN

amazon-web-services - 如何通过 Cloudformation 在 EMR 上运行 Spark 作业

python - 如何使用spark(python)读取zip文件中CSV文件的内容