python - 如何使用 PySpark 将 SparseVector 中的前 X 个单词获取到字符串数组

标签 python apache-spark pyspark

我目前正在对一些文本文档进行聚类。 由于 PySpark 方法,我正在使用 K-means 并使用 TF-IDF 处理我的数据。 现在我想获取每个簇的前 10 个单词:

当我这样做时:

getTopwords_udf = udf(lambda vector: [ countVectorizerModel.vocabulary[indice] for indice in  vector.toArray().tolist().argsort()[-10:][::-1]], ArrayType(StringType()))

predictions.groupBy("prediction").agg(Summarizer.mean(col("features")).alias("means")) \
    .withColumn("topWord", getTopwords_udf(col('means'))) \
    .select("prediction", "topWord") \
    .show(2, truncate=100)

我收到此错误:

Could not serialize object: Py4JError: An error occurred while calling o225.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)


Traceback (most recent call last):
  File "/opt/bigpipe/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 189, in wrapper
    return self(*args)
  File "/opt/bigpipe/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 167, in __call__
    judf = self._judf
  File "/opt/bigpipe/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 151, in _judf
    self._judf_placeholder = self._create_judf()
  File "/opt/bigpipe/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 160, in _create_judf
    wrapped_func = _wrap_function(sc, self.func, self.returnType)
  File "/opt/bigpipe/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 35, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/opt/bigpipe/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/opt/bigpipe/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 597, in dumps
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o225.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

我认为这是因为类型(从 DoubleType 到 float for numpy)所以我也尝试过这个,看看发生了什么

vector_udf = udf(lambda vector: vector.toArray().tolist(), ArrayType(FloatType()))
vector2_udf = udf(lambda vector: vector.sort()[:10], ArrayType(FloatType()))

predictions.groupBy("prediction").agg(Summarizer.mean(col("features")).alias("means")) \
    .withColumn("topWord", vector_udf(col('means'))) \
    .withColumn("topWord2", vector2_udf(col('topWord'))) \
    .select("prediction", "topWord", "topWord2") \
    .show(2, truncate=100)

但我收到此错误 TypeError: 'NoneType' object is not subscriptable

最佳答案

我已经弄清楚如何使用 PySpark 将前 X 个单词从 SparseVector 获取到字符串数组。 这是我为那些可能感兴趣的人提供的解决方案......

def getTopWordContainer(v):
    def getTopWord(vector):
        vectorConverted = vector.toArray().tolist()
        listSortedDesc= [i[0] for i in sorted(enumerate(vectorConverted), key=lambda x:x[1])][-10:][::-1]
        return [v[j] for j in listSortedDesc]
    return getTopWord

getTopWordInit = getTopWordContainer(countVectorizerModel.vocabulary)
getTopWord_udf = udf(getTopWordInit, ArrayType(StringType()))

top = predictions.groupBy("prediction").agg(Summarizer.mean(col("features")).alias("means")) \
    .withColumn("topWord", getTopWord_udf(col('means'))) \
    .select("prediction", "topWord")

我是 Spark 的初学者,所以如果您知道如何增强它,请告诉我:)

关于python - 如何使用 PySpark 将 SparseVector 中的前 X 个单词获取到字符串数组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55353505/

相关文章:

python - 尝试使用Python或ImageMagick提取字体

python - 在测试 python 最大递归深度时,为什么我会多次遇到 RuntimeError?

SQL 是从右到左计算还是从左到右计算?

apache-spark - 将功能应用于Spark DataFrame的每一行

datetime - Pyspark:将 12 小时制的字符串日期时间转换为 24 小时制的日期时间(时区更改)

python - 如何在 pyspark 中有效地将大型 .tsv 文件上传到具有拆分列的 Hive 表?

python - Django:这些模型的正确顺序

scala - 使用spark按排序顺序将数据合并到csv文件

hadoop - 如何从 pyspark 连接到 Teradata?

python - raspbian 自定义守护进程手动启动但不启动