python - 使用 PySpark 进行高效的文本预处理(清理、标记化、停用词、词干提取、过滤)

标签 python apache-spark pyspark apache-spark-sql text-processing

最近在《Learning Spark》这本书上开始学习spark。理论上,一切都清楚了,在实践中,我面临这样一个事实,即我首先需要对文本进行预处理,但没有关于这个主题的实际提示。

我考虑的第一件事是现在最好使用 Dataframe 而不是 RDD,所以我的预处理尝试是在数据帧上进行的。

所需操作:

  • 清除标点符号中的文本 (regexp_replace)
  • token 化(Tokenizer)
  • 删除停用词 (StopWordsRemover)
  • Stematization (SnowballStemmer)
  • 过滤短词 (udf)

  • 我的代码是:
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf, col, lower, regexp_replace
    from pyspark.ml.feature import Tokenizer, StopWordsRemover
    from nltk.stem.snowball import SnowballStemmer
    
    spark = SparkSession.builder \
        .config("spark.executor.memory", "3g") \
        .config("spark.driver.cores", "4") \
        .getOrCreate()
    df = spark.read.json('datasets/entitiesFull/full').select('id', 'text')
    
    # Clean text
    df_clean = df.select('id', (lower(regexp_replace('text', "[^a-zA-Z\\s]", "")).alias('text')))
    
    # Tokenize text
    tokenizer = Tokenizer(inputCol='text', outputCol='words_token')
    df_words_token = tokenizer.transform(df_clean).select('id', 'words_token')
    
    # Remove stop words
    remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean')
    df_words_no_stopw = remover.transform(df_words_token).select('id', 'words_clean')
    
    # Stem text
    stemmer = SnowballStemmer(language='english')
    stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
    df_stemmed = df_words_no_stopw.withColumn("words_stemmed", stemmer_udf("words_clean")).select('id', 'words_stemmed')
    
    # Filter length word > 3
    filter_length_udf = udf(lambda row: [x for x in row if len(x) >= 3], ArrayType(StringType()))
    df_final_words = df_stemmed.withColumn('words', filter_length_udf(col('words_stemmed')))
    

    处理需要很长时间,整个文档的大小为 60 GB。使用RDD有意义吗?缓存会有帮助吗?如何优化预处理?

    首先我在本地计算机上测试了实现,然后我将在集群上尝试。本地计算机 - Ubuntu RAM 6Gb,4 个 CPU。也欢迎任何替代解决方案。谢谢!

    最佳答案

    JSON 通常是 Spark 分析中最糟糕的文件格式,尤其是当它是一个 60GB 的 JSON 文件时。 Spark 适用于 1GB Parquet 文件。一些预处理将有很大帮助:

    temp_df = spark.read.json('datasets/entitiesFull/full').select('id', 'text').repartition(60)
    temp_df.write.parquet('some/other/path')
    df = spark.read.parquet('some/other/path')
    # ... continue the rest of the analysis
    
    包装 SnowballStemmer从性能的角度来看,UDF 并不是最好的,但最现实的是​​,除非您习惯用低级 Java 字节码编写算法。我在 ceja 中创建了 Porter Stemming 算法也使用 UDF。
    这是an example of a native implementation of a Spark function .实现是可能的,但并不容易。

    关于python - 使用 PySpark 进行高效的文本预处理(清理、标记化、停用词、词干提取、过滤),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53579444/

    相关文章:

    javascript - Selenium 无法通过 XPath 识别元素

    python - 获取并绘制 pandas 数据框中的唯一值计数

    apache-spark - 在 SparkSession 中连接到远程 Dataproc 主节点

    python - 在 PySpark 中转置 RowMatrix

    apache-spark - Spark中的sqlite数据库: java. lang.ClassNotFoundException : org. sqlite.JDBC

    pyspark - 将小时、分钟和秒添加到 Spark 数据帧

    python - 将 Django 模型移动到它们自己的文件中

    python - 替换列表元素中的空格

    sql - 整理 NoCase 返回错误

    apache-spark - 如何解决java.lang.OutOfMemoryError : Java heap space when train word2vec model in Spark?