apache-spark - 如何删除 PySpark 中少于三个字母的单词?

标签 apache-spark pyspark apache-spark-sql

我有一个“文本”列,其中存储了标记数组。如何过滤所有这些数组以使标记的长度至少为三个字母?

from pyspark.sql.functions import regexp_replace, col
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate()

columns = ['id', 'text']
vals = [
    (1, ['I', 'am', 'good']),
    (2, ['You', 'are', 'ok']),
]

df = spark.createDataFrame(vals, columns)
df.show()

# Had tried this but have TypeError: Column is not iterable
# df_clean = df.select('id', regexp_replace('text', [len(word) >= 3 for word 
# in col('text')], ''))
# df_clean.show()

我希望看到:

id  |  text  
1   |  [good]
2   |  [You, are]

最佳答案

这样就可以了,您可以决定是否排除行,我添加了一个额外的列并过滤掉了,但选项是您的:

from pyspark.sql import functions as f

columns = ['id', 'text']
vals = [
        (1, ['I', 'am', 'good']),
        (2, ['You', 'are', 'ok']),
        (3, ['ok'])
       ]

df = spark.createDataFrame(vals, columns)
#df.show()

df2 = df.withColumn("text_left_over", f.expr("filter(text, x -> not(length(x) < 3))"))
df2.show()

# This is the actual piece of logic you are looking for.
df3 = df.withColumn("text_left_over", f.expr("filter(text, x -> not(length(x) < 3))")).where(f.size(f.col("text_left_over")) > 0).drop("text")
df3.show()

返回:

+---+--------------+--------------+
| id|          text|text_left_over|
+---+--------------+--------------+
|  1| [I, am, good]|        [good]|
|  2|[You, are, ok]|    [You, are]|
|  3|          [ok]|            []|
+---+--------------+--------------+

+---+--------------+
| id|text_left_over|
+---+--------------+
|  1|        [good]|
|  2|    [You, are]|
+---+--------------+

关于apache-spark - 如何删除 PySpark 中少于三个字母的单词?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53485273/

相关文章:

apache-spark - createOrReplaceTempView 和 registerTempTable 的区别

java - 如何让Spark作业完成后自动重启?

hadoop - 如何在 Apache Spark 中使用 Hadoop InputFormats?

python - PythonRDD 的 rdd 和 ParallelCollectionRDD 有什么区别

python - 如何在 Hadoop 环境中重新训练 Inception 图像分类器

apache-spark - Spark DataFrame 缓存大型临时表

scala - 在Spark的RDD中更新值(value)的有效方法是什么?

python - 使用窗口函数时出现 pyspark 错误(Spark 2.1.0 报告未找到列的问题)?

apache-spark - 在 pyspark 中获得不同连接输出的最佳方法是什么?

scala - 将 UTC unix 时间转换为同一时区的时间戳