我有一个 pyspark.sql.dataframe,其中每一行都是一篇新闻文章。然后我有一个 RDD 代表每篇文章中包含的单词。我想将单词的 RDD 作为名为“单词”的列添加到我的新文章数据框中。我试过了
df.withColumn('words', words_rdd )
但是我得到了错误
AssertionError: col should be Column
DataFrame 看起来像这样
Articles
the cat and dog ran
we went to the park
today it will rain
但我有 3k 篇新闻文章。
我应用了一个函数来清理文本,例如删除停用词,我有一个如下所示的 RDD:
[[cat, dog, ran],[we, went, park],[today, will, rain]]
我试图让我的 Dataframe 看起来像这样:
Articles Words
the cat and dog ran [cat, dog, ran]
we went to the park [we, went, park]
today it will rain [today, will, rain]
最佳答案
免责声明:
Spark DataFrame
通常没有严格定义的顺序。使用风险自负。
将索引添加到现有的DataFrame
:
from pyspark.sql.types import *
df_index = spark.createDataFrame(
df.rdd.zipWithIndex(),
StructType([StructField("data", df.schema), StructField("id", LongType())])
)
将索引添加到RDD
并转换为DataFrame
:
words_df = spark.createDataFrame(
words_rdd.zipWithIndex(),
StructType([
StructField("words", ArrayType(StringType())),
StructField("id", LongType())
])
)
加入两者并选择必填字段:
df_index.join(words_df, "id").select("data.*", "words")
注意
有不同的解决方案,它们可能适用于特定情况,但不保证性能和/或正确性。这些包括:
- 使用
monotonically_increasing_id
作为join
键 - 在一般情况下不正确。 - 使用
row_number()
窗口函数作为连接键 - Not Acceptable 性能影响,如果未定义特定顺序,通常不正确。 - 在
RDDs
上使用zip
- 当且仅当两个结构具有相同的数据分布时才有效(在这种情况下应该有效)。
注意:
在这种特定情况下,您不需要 RDD
。 pyspark.ml.feature
提供了多种变形金刚
,应该适合您。
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
df = spark.createDataFrame(
["the cat and dog ran", "we went to the park", "today it will rain"],
"string"
).toDF("Articles")
Pipeline(stages=[
RegexTokenizer(inputCol="Articles", outputCol="Tokens"),
StopWordsRemover(inputCol="Tokens", outputCol="Words")
]).fit(df).transform(df).show()
# +-------------------+--------------------+---------------+
# | Articles| Tokens| Words|
# +-------------------+--------------------+---------------+
# |the cat and dog ran|[the, cat, and, d...|[cat, dog, ran]|
# |we went to the park|[we, went, to, th...| [went, park]|
# | today it will rain|[today, it, will,...| [today, rain]|
# +-------------------+--------------------+---------------+
可以使用 StopWordsRemover
的 stopWords
参数提供停用词列表,例如:
StopWordsRemover(
inputCol="Tokens",
outputCol="Words",
stopWords=["the", "and", "we", "to", "it"]
)
关于python - 将 PySpark RDD 作为新列添加到 pyspark.sql.dataframe,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42124010/