python - 如何在 PySpark 中序列化 LabeledPoint RDD?

标签 python apache-spark pyspark apache-spark-mllib

我在使用 PySpark 的 saveAsHadoopFile() 时遇到错误,而在使用 saveAsSequenceFile() 时遇到同样的错误。我需要保存一个 (key,val) 的 RDD,其中 key 是一个字符串,val 是一个 LabeledPoint RDD (label, SparseVector)。错误如下所示。谷歌搜索几个来源似乎我应该能够在 IPython 笔记本中做到这一点。我需要序列化这个大的 RDD,以便我可以用 Java 处理它,因为 Spark 的一些 MLLib 功能还不适用于 python。根据这个post这应该是可行的。

看着这个page我明白了:

_picklable_classes = [
    'LinkedList',
    'SparseVector',
    'DenseVector',
    'DenseMatrix',
    'Rating',
    'LabeledPoint',
]

所以我真的不知道为什么会收到此错误。

Code: labeledDataRDD.saveAsSequenceFile('/tmp/pysequencefile/')

Error:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 527.0 failed 1 times, most recent failure: Lost task 0.0 in stage 527.0 (TID 1454, localhost): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype) at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)

编辑:我发现了这个:

public class More ...ClassDictConstructor implements IObjectConstructor     {
12
13  String module;
14  String name;
15
16  public More ...ClassDictConstructor(String module, String name) {
17      this.module = module;
18      this.name = name;
19  }
20
21  public Object More ...construct(Object[] args) {
22      if (args.length > 0)
23          throw new PickleException("expected zero arguments for construction of ClassDict (for "+module+"."+name+")");
24      return new ClassDict(module, name);
25  }
26}

我没有直接使用上面的 construct() 方法..所以我不知道为什么我尝试的 saveAs.. 方法在它不需要参数时将参数传递给它。

编辑 2:按照 zero323 的建议(谢谢)解决了一个小故障。当我尝试 zero323 写的内容时出现错误(见下文)。但是,当我派生一个更简单的 RDD 时,它会工作并将这个更简单的 RDD 保存到 .parquet 文件的目录中(将其分解为几个 .parquet 文件)。更简单的RDD如下:

simplerRDD = labeledDataRDD.map(lambda (k,v): (v.label, v.features))
sqlContext.createDataFrame(simplerRDD, ("k", "v")).write.parquet("labeledData_parquet_file")

尝试保存 labeledDataRDD 时出错:

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_schema(row)
    831         raise TypeError("Can not infer schema for type: %s" % type(row))
    832 
--> 833     fields = [StructField(k, _infer_type(v), True) for k, v in items]
    834     return StructType(fields)
    835 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_type(obj)
    808             return _infer_schema(obj)
    809         except TypeError:
--> 810             raise TypeError("not supported type: %s" % type(obj))
    811 
    812 

TypeError: not supported type: <type 'numpy.unicode_'>

最佳答案

问题的根源不是酸洗本身。如果是,您将不会看到 net.razorvine.pickle.PickleException。如果你看一下 saveAsSequenceFile文档你会看到它需要两个步骤:

  1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects.
  2. Keys and values of this Java RDD are converted to Writables and written out.

你的程序在第一步就失败了,但即使它没有失败,我也不确定什么是预期的 Java 对象以及如何读回它。

我不会使用序列文件,而是将数据简单地写入 Parquet 文件:

from pyspark.mllib.regression import LabeledPoint

rdd = sc.parallelize([
   ("foo", LabeledPoint(1.0, [1.0, 2.0, 3.0])),
   ("bar", LabeledPoint(2.0, [4.0, 5.0, 6.0]))])

sqlContext.createDataFrame(rdd, ("k", "v")).write.parquet("a_parquet_file")

读回并转换:

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD

val rdd: RDD[(String, LabeledPoint)] = sqlContext.read.parquet("a_parquet_file")
  .select($"k", $"v.label", $"v.features")
  .map{case Row(k: String, label: Double, features: Vector) =>
    (k, LabeledPoint(label, features))}

rdd.sortBy(_._1, false).take(2)

// Array[(String, org.apache.spark.mllib.regression.LabeledPoint)] = 
//  Array((foo,(1.0,[1.0,2.0,3.0])), (bar,(2.0,[4.0,5.0,6.0])))

或者如果您更喜欢类似 Java 的方法:

def rowToKeyLabeledPointPair(r: Row): Tuple2[String, LabeledPoint] = {
  // Vector -> org.apache.spark.mllib.linalg.Vector
  Tuple2(r.getString(0), LabeledPoint(r.getDouble(1), r.getAs[Vector](2)))
}

sqlContext.read.parquet("a_parquet_file")
  .select($"k", $"v.label", $"v.features")
  .map(rowToKeyLabeledPointPair)

编辑

一般来说,NumPy 类型在 Spark SQL 中不支持作为独立值。如果你在 RDD 中有 Numpy 类型,你首先要将它们转换为标准的 Python 类型:

tmp = rdd.map(lambda kv: (str(kv[0]), kv[1]))
sqlContext.createDataFrame(tmp, ("k", "v")).write.parquet("a_parquet_file")

关于python - 如何在 PySpark 中序列化 LabeledPoint RDD?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33675586/

相关文章:

python - 如何在Python中多次运行一个函数

sql - Databricks SQL相当于 "Create Trigger"逻辑?

apache-spark - 为推荐引擎建模隐式和显式行为数据

scala - 如何在 Apache Spark 2.3.1 中映射/转换 ArrayType 中的每个元素

apache-spark - weekofyear() 返回 1 月 1 日看似不正确的结果

python - 合并 PysPark 中的重叠区间

python - python 中的 turtle 迷宫。我不知道如何避免 turtle 穿墙和作弊

python - 在 Numpy 数组中配对相邻值

python - 以 html 电子邮件的形式发送 pandas dataframe 数据

scala - 为什么启动 StreamingContext 失败并显示 "IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute"?