python - 如何将 numpy.array 作为新列添加到 pyspark.SQL DataFrame?

标签 python apache-spark pyspark apache-spark-sql

这是创建 pyspark.sql DataFrame 的代码

import numpy as np
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
df = pd.DataFrame(np.array([[1,2,3],[4,5,6],[7,8,9],[10,11,12]]), columns=['a','b','c'])
sparkdf = sqlContext.createDataFrame(df, samplingRatio=0.1)

所以 sparkdf 看起来像

a  b  c
1  2  3
4  5  6
7  8  9
10 11 12

现在我想添加一个 numpy 数组(甚至列表)作为新列

new_col = np.array([20,20,20,20])

但是标准方式

sparkdf = sparkdf.withColumn('newcol', new_col)

失败。 可能 udf 是要走的路,但我不知道如何创建一个 udf 为每个 DataFrame 行分配一个不同的值,即遍历 new_col。 我看过其他 pyspark 和 pyspark.sql 但找不到解决方案。 此外,我需要留在 pyspark.sql 中,而不是 scala 解决方案。谢谢!

最佳答案

假设数据框已排序以匹配数组中值的顺序,您可以压缩 RDD 并重建数据框,如下所示:

n = sparkdf.rdd.getNumPartitions()

# Parallelize and cast to plain integer (np.int64 won't work)
new_col = sc.parallelize(np.array([20,20,20,20]), n).map(int) 

def process(pair):
    return dict(pair[0].asDict().items() + [("new_col", pair[1])])

rdd = (sparkdf
    .rdd # Extract RDD
    .zip(new_col) # Zip with new col
    .map(process)) # Add new column

sqlContext.createDataFrame(rdd) # Rebuild data frame

您还可以使用连接:

new_col = sqlContext.createDataFrame(
    zip(range(1, 5), [20] * 4),
    ("rn", "new_col"))

sparkdf.registerTempTable("df")

sparkdf_indexed = sqlContext.sql(
    # Make sure we have specific order and add row number
    "SELECT row_number() OVER (ORDER BY a, b, c) AS rn, * FROM df")

(sparkdf_indexed
    .join(new_col, new_col.rn == sparkdf_indexed.rn)
    .drop(new_col.rn))

但窗口函数组件不可扩展,应避免使用较大的数据集。

当然,如果您只需要一个单一值的列,您可以简单地使用 lit

import pyspark.sql.functions as f
sparkdf.withColumn("new_col", f.lit(20))

但我认为情况并非如此。

关于python - 如何将 numpy.array 作为新列添加到 pyspark.SQL DataFrame?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31930364/

相关文章:

python - 如何在pyspark中对数组中的标签进行编码

python - 为什么我们需要指定标准的 Lark 词法分析器来捕获评论终端?

scala - AggregateByKey 在抽象类中时无法编译

python - 如何在pyspark脚本中访问SparkContext

python - PySpark - 显示数据框中列数据类型的计数

python - 如何在 PySpark DataFrame 中将 ArrayType 转换为 DenseVector?

python - 如何在python中创建一个枢轴嵌套字典

python - 在 Python 中从 C++ 继承 Base,使用 SWIG 调用抽象方法

python - Google App Engine remote_api 使用 remote_api_shell.py 返回 404

python - 使用 pyspark 将 csv 文件转换为 parquet 文件 : Py4JJavaError: An error occurred while calling o347. parquet 错误