apache-spark - 如何为元组列表创建 PySpark 模式?

标签 apache-spark pyspark schema

以下元组列表的正确 PySpark 架构应该是什么?我想将架构应用于以下数据:

[('a', 0.0), ('b', 6), ('c', 44), ('d', 107), ('e', 0), ('f', 3), ('g', 4), ('h', 0.025599999353289604), ('i', 0.03239999711513519), ('j', -0.03205680847167969), ('k', 0.10429033637046814), ('l', (34.190006256103516, 31.09000015258789, 31.099994659423828)), ('m', (-9.32000732421875, -9.32000732421875, -11.610000610351562)) ]

我想要以下格式的结果: Format

最佳答案

虽然我想建议另一种方法,但 Tanjin 的答案应该有效。而不是找出应该添加到架构中的列数来创建数组/列表类型的列。下一个代码将您的数据转换为一个 rdd,而不是包含 [key, value] 行的元组,其中 value 是一个 double 列表。然后您可以轻松应用下面的架构。

def test():
    l = [('a', 0.0), 
    ('b', 6), 
    ('c', 44), 
    ('d', 107), 
    ('e', 0), 
    ('f', 3), 
    ('g', 4), 
    ('h', 0.025599999353289604), 
    ('i', 0.03239999711513519), 
    ('j', -0.03205680847167969), 
    ('k',0.10429033637046814), 
    ('l',(34.190006256103516, 31.09000015258789, 31.099994659423828)), 
    ('m',(-9.32000732421875, -9.32000732421875, -11.610000610351562))]

    # this schema should work for all your cases 
    schema = StructType([
        StructField("id", StringType(), False),
        StructField("num_list", ArrayType(DoubleType(), True), True)
    ])

    rdd = spark.sparkContext.parallelize(l).map(lambda r: (r[0], to_float_list(r[1])))

    df = spark.createDataFrame(rdd, schema)

    df.show(100, False)

def to_float_list(value):
    if type(value) is tuple:  
        return list(map(float, value))

    return [float(value)]

注意 to_float_list 函数接受元组或数字并将其转换为 double 列表。这将输出:

+---+-----------------------------------------------------------+
|id |num_list                                                   |
+---+-----------------------------------------------------------+
|a  |[0.0]                                                      |
|b  |[6.0]                                                      |
|c  |[44.0]                                                     |
|d  |[107.0]                                                    |
|e  |[0.0]                                                      |
|f  |[3.0]                                                      |
|g  |[4.0]                                                      |
|h  |[0.025599999353289604]                                     |
|i  |[0.03239999711513519]                                      |
|j  |[-0.03205680847167969]                                     |
|k  |[0.10429033637046814]                                      |
|l  |[34.190006256103516, 31.09000015258789, 31.099994659423828]|
|m  |[-9.32000732421875, -9.32000732421875, -11.610000610351562]|
+---+-----------------------------------------------------------+

关于apache-spark - 如何为元组列表创建 PySpark 模式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54967720/

相关文章:

apache-spark - 从 pyspark 数据框中的结构类型获取字段值

file - Spark 分区/集群执行

scala - Spark : group concat equivalent in scala rdd

apache-spark - PySpark:如何加速 sqlContext.read.json?

python - 在个人计算机上加载 PipelineModel 时出现 ValueError

json - pydantic 与 json 模式相比的优缺点

sql - 如何提高多对多 SQL 查询的性能?

mysql - 将多种类型的用户放入数据库的有效方法是什么

elasticsearch - 如何将es.nodes参数设置为Spark的多个Elasticsearch节点?

python - pyspark 中的 Pandas UDF