我在使用 PySpark 的 saveAsHadoopFile() 时遇到错误,而在使用 saveAsSequenceFile() 时遇到同样的错误。我需要保存一个 (key,val) 的 RDD,其中 key 是一个字符串,val 是一个 LabeledPoint RDD (label, SparseVector)。错误如下所示。谷歌搜索几个来源似乎我应该能够在 IPython 笔记本中做到这一点。我需要序列化这个大的 RDD,以便我可以用 Java 处理它,因为 Spark 的一些 MLLib 功能还不适用于 python。根据这个post这应该是可行的。
看着这个page我明白了:
_picklable_classes = [
'LinkedList',
'SparseVector',
'DenseVector',
'DenseMatrix',
'Rating',
'LabeledPoint',
]
所以我真的不知道为什么会收到此错误。
Code: labeledDataRDD.saveAsSequenceFile('/tmp/pysequencefile/')
Error:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 527.0 failed 1 times, most recent failure: Lost task 0.0 in stage 527.0 (TID 1454, localhost): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype) at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
编辑:我发现了这个:
public class More ...ClassDictConstructor implements IObjectConstructor {
12
13 String module;
14 String name;
15
16 public More ...ClassDictConstructor(String module, String name) {
17 this.module = module;
18 this.name = name;
19 }
20
21 public Object More ...construct(Object[] args) {
22 if (args.length > 0)
23 throw new PickleException("expected zero arguments for construction of ClassDict (for "+module+"."+name+")");
24 return new ClassDict(module, name);
25 }
26}
我没有直接使用上面的 construct() 方法..所以我不知道为什么我尝试的 saveAs.. 方法在它不需要参数时将参数传递给它。
编辑 2:按照 zero323 的建议(谢谢)解决了一个小故障。当我尝试 zero323 写的内容时出现错误(见下文)。但是,当我派生一个更简单的 RDD 时,它会工作并将这个更简单的 RDD 保存到 .parquet 文件的目录中(将其分解为几个 .parquet 文件)。更简单的RDD如下:
simplerRDD = labeledDataRDD.map(lambda (k,v): (v.label, v.features))
sqlContext.createDataFrame(simplerRDD, ("k", "v")).write.parquet("labeledData_parquet_file")
尝试保存 labeledDataRDD 时出错:
/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_schema(row)
831 raise TypeError("Can not infer schema for type: %s" % type(row))
832
--> 833 fields = [StructField(k, _infer_type(v), True) for k, v in items]
834 return StructType(fields)
835
/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_type(obj)
808 return _infer_schema(obj)
809 except TypeError:
--> 810 raise TypeError("not supported type: %s" % type(obj))
811
812
TypeError: not supported type: <type 'numpy.unicode_'>
最佳答案
问题的根源不是酸洗本身。如果是,您将不会看到 net.razorvine.pickle.PickleException
。如果你看一下 saveAsSequenceFile
文档你会看到它需要两个步骤:
- Pyrolite is used to convert pickled Python RDD into RDD of Java objects.
- Keys and values of this Java RDD are converted to Writables and written out.
你的程序在第一步就失败了,但即使它没有失败,我也不确定什么是预期的 Java 对象以及如何读回它。
我不会使用序列文件,而是将数据简单地写入 Parquet 文件:
from pyspark.mllib.regression import LabeledPoint
rdd = sc.parallelize([
("foo", LabeledPoint(1.0, [1.0, 2.0, 3.0])),
("bar", LabeledPoint(2.0, [4.0, 5.0, 6.0]))])
sqlContext.createDataFrame(rdd, ("k", "v")).write.parquet("a_parquet_file")
读回并转换:
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val rdd: RDD[(String, LabeledPoint)] = sqlContext.read.parquet("a_parquet_file")
.select($"k", $"v.label", $"v.features")
.map{case Row(k: String, label: Double, features: Vector) =>
(k, LabeledPoint(label, features))}
rdd.sortBy(_._1, false).take(2)
// Array[(String, org.apache.spark.mllib.regression.LabeledPoint)] =
// Array((foo,(1.0,[1.0,2.0,3.0])), (bar,(2.0,[4.0,5.0,6.0])))
或者如果您更喜欢类似 Java 的方法:
def rowToKeyLabeledPointPair(r: Row): Tuple2[String, LabeledPoint] = {
// Vector -> org.apache.spark.mllib.linalg.Vector
Tuple2(r.getString(0), LabeledPoint(r.getDouble(1), r.getAs[Vector](2)))
}
sqlContext.read.parquet("a_parquet_file")
.select($"k", $"v.label", $"v.features")
.map(rowToKeyLabeledPointPair)
编辑
一般来说,NumPy 类型在 Spark SQL 中不支持作为独立值。如果你在 RDD 中有 Numpy 类型,你首先要将它们转换为标准的 Python 类型:
tmp = rdd.map(lambda kv: (str(kv[0]), kv[1]))
sqlContext.createDataFrame(tmp, ("k", "v")).write.parquet("a_parquet_file")
关于python - 如何在 PySpark 中序列化 LabeledPoint RDD?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33675586/