python - 将 PySpark RDD 作为新列添加到 pyspark.sql.dataframe

标签 python apache-spark pyspark

我有一个 pyspark.sql.dataframe,其中每一行都是一篇新闻文章。然后我有一个 RDD 代表每篇文章中包含的单词。我想将单词的 RDD 作为名为“单词”的列添加到我的新文章数据框中。我试过了

df.withColumn('words', words_rdd )

但是我得到了错误

AssertionError: col should be Column

DataFrame 看起来像这样

Articles
the cat and dog ran
we went to the park
today it will rain

但我有 3k 篇新闻文章。

我应用了一个函数来清理文本,例如删除停用词,我有一个如下所示的 RDD:

[[cat, dog, ran],[we, went, park],[today, will, rain]]

我试图让我的 Dataframe 看起来像这样:

Articles                 Words
the cat and dog ran      [cat, dog, ran]
we went to the park      [we, went, park]
today it will rain       [today, will, rain]

最佳答案

免责声明:

Spark DataFrame 通常没有严格定义的顺序。使用风险自负。

将索引添加到现有的DataFrame:

from pyspark.sql.types import *

df_index = spark.createDataFrame(
    df.rdd.zipWithIndex(),
    StructType([StructField("data", df.schema), StructField("id", LongType())])
)

将索引添加到RDD并转换为DataFrame:

words_df = spark.createDataFrame(
    words_rdd.zipWithIndex(),
    StructType([
        StructField("words", ArrayType(StringType())),
        StructField("id", LongType())
    ])
)

加入两者并选择必填字段:

df_index.join(words_df, "id").select("data.*", "words")

注意

有不同的解决方案,它们可能适用于特定情况,但不保证性能和/或正确性。这些包括:

  • 使用 monotonically_increasing_id 作为 join 键 - 在一般情况下不正确。
  • 使用 row_number() 窗口函数作为连接键 - Not Acceptable 性能影响,如果未定义特定顺序,通常不正确。
  • RDDs 上使用 zip - 当且仅当两个结构具有相同的数据分布时才有效(在这种情况下应该有效)。

注意:

在这种特定情况下,您不需要 RDDpyspark.ml.feature 提供了多种变形金刚,应该适合您。

from pyspark.ml.feature import *
from pyspark.ml import Pipeline

df = spark.createDataFrame(
     ["the cat and dog ran", "we went to the park", "today it will rain"],
         "string"
).toDF("Articles")

Pipeline(stages=[
    RegexTokenizer(inputCol="Articles", outputCol="Tokens"), 
    StopWordsRemover(inputCol="Tokens", outputCol="Words")
]).fit(df).transform(df).show()
# +-------------------+--------------------+---------------+
# |           Articles|              Tokens|          Words|
# +-------------------+--------------------+---------------+
# |the cat and dog ran|[the, cat, and, d...|[cat, dog, ran]|
# |we went to the park|[we, went, to, th...|   [went, park]|
# | today it will rain|[today, it, will,...|  [today, rain]|
# +-------------------+--------------------+---------------+

可以使用 StopWordsRemoverstopWords 参数提供停用词列表,例如:

StopWordsRemover(
    inputCol="Tokens",
    outputCol="Words",
    stopWords=["the", "and", "we", "to", "it"]
)

关于python - 将 PySpark RDD 作为新列添加到 pyspark.sql.dataframe,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42124010/

相关文章:

python - Nose 测试 : how to show customized logging

python - 从多行记录创建 Spark 数据结构

hadoop - 如何将 PySpark worker 中的 numpy 数组保存到 HDFS 或共享文件系统?

python - 如何将异步 on_message 函数与 python web 套接字客户端库一起使用?

python - 如何使 pyinotify 在对文件进行任何修改时运行程序?

python - 在python中合并某些列相同而其他列不同的csv文件

java - 关于Spark的持久化机制

apache-spark - 如何使用 PySpark 对 Delta 文件的分区动态执行插入覆盖?

apache-spark - 如何使用 PySpark 将 CSV 文件读取为数据帧时跳过行?

Python - PySpark 的 Pickle Spacy