python - 将函数应用于 Spark DataFrame 中的所有单元格

标签 python pandas apache-spark pyspark apache-spark-sql

我正在尝试将一些 Pandas 代码转换为 Spark 以进行缩放。 myfunc 是一个复杂 API 的包装器,它接受一个字符串并返回一个新字符串(这意味着我不能使用矢量化函数)。

def myfunc(ds):
    for attribute, value in ds.items():
        value = api_function(attribute, value)
        ds[attribute] = value
    return ds

df = df.apply(myfunc, axis='columns')

myfunc 获取一个 DataSeries,将其分解为单独的单元格,为每个单元格调用 API,并构建一个具有相同列名的新 D​​ataSeries。这有效地修改了 DataFrame 中的所有单元格。

我是 Spark 的新手,我想使用 pyspark 翻译这个逻辑。我已将我的 pandas DataFrame 转换为 Spark:

spark = SparkSession.builder.appName('My app').getOrCreate()
spark_schema = StructType([StructField(c, StringType(), True) for c in df.columns])
spark_df = spark.createDataFrame(df, schema=spark_schema)

这是我迷路的地方。我需要一个 UDF,一个 pandas_udf 吗?如何遍历所有单元格并使用 myfunc 为每个单元格返回一个新字符串? spark_df.foreach() 不返回任何内容,也没有 map() 函数。

我可以将 myfuncDataSeries -> DataSeries 修改为 string -> string 如有必要。

最佳答案

选项 1:一次在一列上使用 UDF

最简单的方法是重写您的函数以将字符串作为参数(因此它是 string -> string)并使用 UDF。有一个很好的例子 here .这一次适用于一列。因此,如果您的 DataFrame 具有合理数量的列,您可以一次将 UDF 应用于每一列:

from pyspark.sql.functions import col
new_df = df.select(udf(col("col1")), udf(col("col2")), ...)

例子

df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
df.show()
+----+----+
|col1|col2|
+----+----+
|   1|   4|
|   2|   5|
|   3|   6|
+----+----+

def plus1_udf(x):
    return x + 1
plus1 = spark.udf.register("plus1", plus1_udf)

new_df = df.select(plus1(col("col1")), plus1(col("col2")))
new_df.show()
+-----------+-----------+
|plus1(col1)|plus1(col2)|
+-----------+-----------+
|          2|          5|
|          3|          6|
|          4|          7|
+-----------+-----------+

选项 2:一次映射整个 DataFrame

map 可用于 Scala DataFrame,但目前在 PySpark 中不可用。 下级RDD API 在 PySpark 中确实有一个 map 函数。因此,如果您有太多列而无法一次转换一个,您可以像这样对 DataFrame 中的每个单元格进行操作:

def map_fn(row):
    return [api_function(x) for (column, x) in row.asDict().items()

column_names = df.columns
new_df = df.rdd.map(map_fn).toDF(df.columns)

例子

df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
def map_fn(row):
   return [value + 1 for (_, value) in row.asDict().items()]

columns = df.columns
new_df = df.rdd.map(map_fn).toDF(columns)
new_df.show()
+----+----+
|col1|col2|
+----+----+
|   2|   5|
|   3|   6|
|   4|   7|
+----+----+

上下文

documentation foreach 只给出了打印的例子,但是我们可以通过查看 code 来验证它确实不返回任何东西。

您可以在 this post 中阅读有关 pandas_udf 的信息,但它似乎最适合向量化函数,正如您所指出的,由于 api_function 而无法使用。

关于python - 将函数应用于 Spark DataFrame 中的所有单元格,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54489344/

相关文章:

python - 根据月份绘制 pandas DataFrame

apache-spark - 如何检查数组列是否在 PySpark 数据框中的另一个列数组内

python - Python 中的 spatialite 的 SQLite load_extension 失败

python - Pandas :取多个数据框的中位数

python - 在 Pandas 中广播列表

apache-spark - 关于数据集中的 kryo 和 java 编码器的问题

maven - Spark Streaming + json4s-jackson 依赖问题

python - 从具有相同索引和列的两个 pandas 数据帧执行计算的最快方法

python - Pandas :将新列添加到数据框,这是索引列的副本

python - 如何在 linux 中安装 python 的 sphinx 文档生成器?