python-3.x - 数据框列的 Pyspark 并行循环

标签 python-3.x dataframe pyspark

我有一个带有封装列的原始 Dataframe pyspark。我需要在所有列上循环以展开这些列。我不知道名称列,它们可能会改变。所以我需要通用算法。问题是我不能使用经典循环 (for),因为我需要并行代码。

数据示例:

Timestamp | Layers
1456982   | [[1, 2],[3,4]]
1486542   | [[3,5], [5,5]]

在层中,它是一个包含其他列(具有自己的列名)的列。我的目标是拥有这样的东西:

Timestamp | label | number1 | text | value
1456982   | 1     | 2       |3     |4
1486542   | 3     | 5       |5     |5

如何使用 pyspark 函数对列进行循环?

多谢指教

最佳答案

您可以对此使用 reduce 函数。我不知道你想做什么,但假设你想在所有列中加 1:

from functools import reduce
from pyspark.sql import functions as F

def add_1(df, col_name):
    return df.withColumn(col_name, F.col(col_name)+1) # using same column name will update column

reduce(add_1, df.columns, df)

编辑: 我不确定在不转换 rdd 的情况下解决它。也许这会有所帮助:

from pyspark.sql import Row

flatF = lambda col: [item for item in l for l in col]
df \
    .rdd \
    .map(row: Row(timestamp=row['timestamp'],
          **dict(zip(col_names, flatF(row['layers']))))) \
    .toDF()

关于python-3.x - 数据框列的 Pyspark 并行循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51023217/

相关文章:

python - 在 Python 中的日期范围内为每年创建新行?

python - 计算重复行并填充列

python - pyspark dataframe 将 json 列转换为新列

python-3.x - Py4JJavaError : An error occurred while calling o26. Parquet 。 (阅读 Parquet 文件)

r - 将数字(integer64 类)UNIX 时间戳转换为日期时间类

apache-spark - PySpark 在 YARN 客户端模式下运行,但在 "User did not initialize spark context!"的集群模式下失败

javascript - 如何在python中将数据从一个文件传递到另一个文件?

python - 如何通过在一个函数中包含所有类型的填充来处理缺失值?

python - 将for循环内部和外部的print stmt连接成python中的一个句子

python-3.x - 如何 Pandas 对具有其中一列作为版本号的数据框进行排序?