python - 在spark Dataframe中动态创建多列

标签 python apache-spark dynamic pyspark multiple-columns

我有字典,其中包含以下信息:

dict_segs = {'key1' : {'a' : {'col1' : 'value1', 'col2' : 'value2', 'col3': 'value3'}, 
                'b' : {'col2' : 'value2', 'col3' : 'value3'}, 
                'c' : {'col1' : 'value1'}},
        'key2' : {'d' : {'col3' : 'value3', 'col2' : 'value2'},
                'f' : {'col1' : 'value1', 'col4' : 'value4'}}}

待办事项:

键基本上是“段”,其中底层字典,即 key1 的 a、b、c 是“子段”。对于每个子分段,过滤条件可在子分段的基础字典中使用,即 a、b、c、d、f。另外,子段字典键的过滤条件也是pyspark dataframe的列名。

我想在 pyspark 数据框中为每个段一次性创建子段列,并且满足过滤条件时每个子段列的值为 1,否则为 0,类似,

for item in dict_segs:
    pyspark_dataframe.withColumn(*dict_segs[item].keys(), when(meeting filter criteria with respect to each key), 1).otherwise(0))

在进行研究时,我能够在 scala 中找到类似的东西,但是列过滤条件是静态的,但对于上述逻辑,即动态的。请参阅下面的 scala 逻辑,

Spark/Scala repeated calls to withColumn() using the same function on multiple columns

需要支持根据上面的伪代码导出每个段的上述逻辑。

谢谢。

最佳答案

您正在寻找 select 语句:

让我们创建一个示例数据框:

df = spark.createDataFrame(
    sc.parallelize([["value" + str(i) for i in range(1, 5)], ["value" + str(i) for i in range(5, 9)]]), 
    ["col" + str(i) for i in range(1, 5)]
)

+------+------+------+------+
|  col1|  col2|  col3|  col4|
+------+------+------+------+
|value1|value2|value3|value4|
|value5|value6|value7|value8|
+------+------+------+------+

现在对于字典中的所有、对于dict_seg[key]中的所有子键以及对于所有dict_seg[key][subkey] 中:

import pyspark.sql.functions as psf
df.select(
    ["*"] +
    [
        eval('&'.join([
            '(df["' + c + '"] == "' + dict_segs[k][sk][c] + '")' for c in dict_segs[k][sk].keys()
        ])).cast("int").alias(sk) 
        for k in dict_segs.keys() for sk in dict_segs[k].keys()
    ]
).show()

+------+------+------+------+---+---+---+---+---+
|  col1|  col2|  col3|  col4|  a|  b|  c|  d|  f|
+------+------+------+------+---+---+---+---+---+
|value1|value2|value3|value4|  1|  1|  1|  1|  1|
|value5|value6|value7|value8|  0|  0|  0|  0|  0|
+------+------+------+------+---+---+---+---+---+
  • "*" 允许您保留所有以前存在的列,它可以用 df.columns 替换。
  • alias(sk) 允许您为新列指定名称 sk
  • cast("int") 将 boolean 类型更改为 int 类型

我不太明白为什么你有一个深度 3 的字典,似乎 key1, key2 并不是很有用。

关于python - 在spark Dataframe中动态创建多列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46155079/

相关文章:

python - 如何在 seaborn.heatmap 中设置颜色映射?

python - OpenERP : fetch record with same name

python - css 未加载到我的 Django 登录和管理页面中

python - Azure Kusto Spark 在写入中重写 ingestion_time()

python - 将 PySpark RDD 作为新列添加到 pyspark.sql.dataframe

python - Spark窗口函数,根据数据集中的值创建排名列

python - Selenium +Python+单元测试:Error message when executing Test "Other element would receive the click"

c - 如何在 C 的过程中创建动态数组?

c - C 中动态分配的字符串矩阵

html - dart HTML TableCellElement - 无法获取或设置 colspan