apache-spark - UDF 将单词映射到 Spark 中的术语索引

标签 apache-spark pyspark apache-spark-sql user-defined-functions apache-spark-ml

我正在尝试为我从 LDA 模型获得的术语 ID 获取相应的主题词。

这是Spark中LDA的主题数据框和词分布

topics_desc=ldaModel.describeTopics(20)
topics_desc.show(1)

+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[0, 39, 68, 43, 5...|[0.06362107696025...|
+-----+--------------------+--------------------+
only showing top 1 row

现在因为我们有 termIndices 而不是实际的词,我想向这个数据框中添加另一列,这将是相应 termIndices 的词。

现在自从我跑了 CountVectorizer在 Spark 中,我使用该模型并获得如下所示的单词数组列表。

# Creating Term Frequency Vector for each word
cv=CountVectorizer(inputCol="words", outputCol="tf_features", minDF=2.0)
cvModel=cv.fit(swremoved_df)
cvModel.vocabulary给出单词列表。

所以现在这里是我写的一个 udf 来获取映射:

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType

def term_to_words(termindices):
    """ To get the corresponding words from term indices

    """


    return np.array(cvModel.vocabulary)[termindices]

term_to_words_conv=udf(term_to_words)


topics=topics_desc.withColumn("topics_words",term_to_words_conv("termIndices"))

我将列表转换为 np 数组的原因是因为在 numpy 数组中,我可以通过传递一个在列表中无法做到的索引来进行索引。

但我收到这个错误。我不知道为什么会这样,因为我在这里几乎没有做任何事情。
Py4JError: An error occurred while calling o443.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

编辑:

于是想到用mapper函数代替udf

def term_to_words(x):
    """ Mapper function to get the corresponding words for the term index

    """

    row=x.asDict()
    word_list=np.array(cvModel.vocabulary)

    return (row['topic'],row['termIndices'],row['termWeights'],word_list[row[termindices]])


topics_rdd=topics_desc.rdd.map(term_to_words)

/Users/spark2/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    931         # SparkContext#runJob.
    932         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 933         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
    934         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
    935 

AttributeError: 'NoneType' object has no attribute 'sc'

最佳答案

这里有两个不同的问题:

  • CountVectorizer是 Java 对象的包装器。它不能被序列化并与闭包一起传递。出于同样的原因,您不能在 map 中使用它关闭。
  • 您不能从 UDF 返回 NumPy 类型。

  • 例如,您可以:
    from pyspark.sql.types import ArrayType, StringType
    
    def indices_to_terms(vocabulary):
        def indices_to_terms(xs):
            return [vocabulary[int(x)] for x in xs]
        return udf(indices_to_terms, ArrayType(StringType()))
    

    用法:
    topics_desc.withColumn(
        "topics_words", indices_to_terms(cvModel.vocabulary)("termIndices"))
    

    如果你想使用 NumPy 数组,你必须使用 tolist()从 UDF 返回之前的方法。

    关于apache-spark - UDF 将单词映射到 Spark 中的术语索引,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42284681/

    相关文章:

    apache-spark - 以编程方式确定 Spark 可用的核心数量和内存量

    apache-spark - 如何将管道分隔的列拆分为多行?

    apache-spark - 在pyspark中将rdd转换为没有模式的数据帧

    python - 类型错误 : 'DataFrameReader' object is not callable

    scala - 如果 csv 列标题包含空格,则在 Spark 中将 csv 转换为 parquet 会出错

    python - 使用列的长度过滤 DataFrame

    java - Spark - 为什么在打印 RDD 之前需要收集()到驱动程序节点?不能并行吗?

    apache-spark - Apache Hadoop的安装目录在哪里

    Pyspark - 日期偏移之间的窗口函数范围

    sql - pyspark 中特定列的每个值始终为 NULL 的列类别