python - 有没有办法将字典类型的列添加到pyspark中的spark数据帧?

标签 python pyspark apache-spark-sql

这就是我在 pyspark 中创建具有原始数据类型的数据帧的方式:

from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType
fields = [StructField('column1', IntegerType(), True), StructField('column2', IntegerType(), True)]
schema = StructType(fields)
df = spark.createDataFrame([], schema)
values = [tuple([i]) +
          tuple([i])
          for i in range(3)]
df = spark.createDataFrame(values, schema)

现在,如果我想要包含字典数据的第三列,例如:{"1": 1.0, "2": 2.0, "3": 3.0},我该怎么办?
我想创建这个数据框:
+--------------------+-----------------+------------------------------+
|column1             |column2          |column3                       |
+--------------------+-----------------+------------------------------+
|1                   |1                |{"1": 1.0, "2": 1.0, "3": 1.0}|
+--------------------+-----------------+------------------------------+
|2                   |2                |{"1": 2.0, "2": 2.0, "3": 2.0}|
+--------------------+-----------------+------------------------------+
|3                   |3                |{"1": 3.0, "2": 3.0, "3": 3.0}|
+--------------------+-----------------+------------------------------+

有一个 MapType 似乎很有帮助,但我不知道如何使用它?

假设数据框已创建,如何根据第三列对其进行过滤,给出一个字典来选择具有该字典值的数据框的行?

最佳答案

示例如何创建:

from pyspark.sql.types import MapType, IntegerType, DoubleType, StringType, StructType, StructField
import pyspark.sql.functions as f

schema = StructType([
            StructField('column1', IntegerType()),
            StructField('column2', IntegerType()),
            StructField('column3', MapType(StringType(), DoubleType()))])

data = [(1, 2, {'a':3.5, 'b':4.2}), (4, 8, {'b':3.7, 'e':4.9})]
df = spark.createDataFrame(data, schema=schema)
df.show()

输出:
+-------+-------+--------------------+
|column1|column2|             column3|
+-------+-------+--------------------+
|      1|      2|[a -> 3.5, b -> 4.2]|
|      4|      8|[e -> 4.9, b -> 3.7]|
+-------+-------+--------------------+

关于如何过滤 DataFrame 只留下具有特定键的元素的示例(假设您在 map 中没有空值并且您的 Spark 版本是 2.4+,因为早期版本没有 element_at ):
filtered_df = df.where(f.element_at(df.column3, 'a').isNotNull())

输出:
+-------+-------+--------------------+
|column1|column2|             column3|
+-------+-------+--------------------+
|      1|      2|[a -> 3.5, b -> 4.2]|
+-------+-------+--------------------+

我可能误解了你的问题 - 如果你的意图是只留下 map 列等于你拥有的特定字典的行,那就有点棘手了。据我所知,Spark 没有对字典类型进行比较操作(这是有点不寻常的操作)。有一种方法可以使用 udf 来实现它,但效率不会很高。代码可能如下所示:
from pyspark.sql.types import MapType, IntegerType, DoubleType, StringType, StructType, StructField, BooleanType
my_dict = {'b':2.7, 'e':4.9}

from pyspark.sql.functions import udf
def map_equality_comparer(my_dict):
    @udf(BooleanType())
    def comparer(m):
        if len(m) != len(my_dict): return False
        for k, v in m.items():
            if my_dict.get(k) != v: return False
        return True
    return comparer

filtered_df = df.where(map_equality_comparer(my_dict)(df.column3))
filtered_df.show()

如果这对您来说太慢了,您可以考虑创建字典的规范表示并进行比较(例如将字典转换为键值对的排序数组并根据这些数组的相等性进行过滤)。

关于python - 有没有办法将字典类型的列添加到pyspark中的spark数据帧?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62121832/

相关文章:

python - Airflow DataProcPySparkOperator 不考虑全局区域以外的集群

python opencv imwrite ...找不到参数

python - 在 SQLAlchemy 中使用 1-N 关系

sql-server - Spark 在执行 jdbc 保存时给出空指针异常

python - 将张量保存为 JPEG 图像 - Python/TensorFlow

apache-spark - 如何在 Google Dataproc 中发送失败作业的警报通知?

apache-spark - 如何删除 pyspark 中的常量列,但不删除具有空值和其他值的列?

apache-spark - 使用 PySpark 直接 Kafka Stream (Apache Spark 1.6)

apache-spark - Spark-Csv 写引用模式不起作用

sql - Apache Spark SQL 按范围分组数据