我在 Hadoop 中有一个自定义可写类,它保存为序列文件,如下所示
public class ABC implements Writable{
private byte[] myId;
private byte[] myType;
//Constructor and other methods
@Override
public void write(DataOutput out) throws IOException {
myId.write(out);
myType.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
myId.readFields(in);
myType.readFields(in);
}
}
我想使用 PySpark 读取序列文件并获取数据。我尝试了以下三种方法:sc.sequenceFile("file:///Test.seq", keyClass = "ABC", valueClass ="ABC")
但得到
object not serializable (class: ABC, value: ABC@451de3ec)
来自官方教程http://spark.apache.org/docs/latest/programming-guide.html#external-datasets ,它说
If you have custom serialized binary data (such as loading data from Cassandra / HBase), then you will first need to transform that data on the Scala/Java side to something which can be handled by Pyrolite’s pickler. A Converter trait is provided for this. Simply extend this trait and implement your transformation code in the convert method.
因此,我实现了一个转换器,如下所示:
import test.ABC
import java.io.DataInput
import org.apache.spark.api.python.Converter
/**
* Implementation of [[org.apache.spark.api.python.Converter]] that converts data
* to ABC
*/
class DataToABCConverter extends Converter[Any, ABC] {
override def convert(obj: Any): ABC = {
if (obj == null) {
return null
}
val in = obj.asInstanceOf[DataInput]
val abc = new ABC()
abc.readFields(in)
abc
}
}
在 PySpark 我使用以下代码sc.sequenceFile("file:///Test.seq", keyClass = "ABC", valueClass ="ABC", keyConverter="DataToABCConverter", valueConverter="DataToABCConverter" )
但是得到以下错误java.lang.ClassCastException: ABC cannot be cast to java.io.DataInput
看起来转换器的输入是我的 ABC 类而不是 java.io.DataInput,所以我不能应用 readFields 方法来获取数据。我添加一个
geID()
获取字节并更改转换器的方法如下:class DataToChunkConverter extends Converter[Any, BytesWritable] {
override def convert(obj: Any): BytesWritable = {
if (obj == null) {
return null
}
val abc = obj.asInstanceOf[ABC]
val idd = abc.getID()
new BytesWritable(idd)
}
}
比我运行 pyspark 使用pyspark --master=local[8] --conf "spark.kryo.classesToRegister=org.apache.hadoop.io.BytesWritable" --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
但是得到以下错误Failed to pickle Java object as value: BytesWritable, falling back
to 'toString'. Error: couldn't pickle object of type class org.apache.hadoop.io.BytesWritable
所以我的问题是在 PySpark 中读取自定义序列文件的正确方法是什么?什么类型可以通过 PySpark 序列化?任何建议表示赞赏!
最佳答案
经过一些实验(遵循第三种方法),事实证明如果使用scala或Java中的native类型作为转换器的返回类型,它就可以工作。
例如,使用 Array[Byte]
作为返回类型,Pyspark 可以成功获取数据:
class DataToChunkConverter extends Converter[Any, Array[Byte]] {
override def convert(obj: Any): Array[Byte] = {
if (obj == null) {
return null
}
val abc = obj.asInstanceOf[ABC]
val idd = abc.getID()
idd
}
}
关于java - 在 Spark 中读取自定义序列文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42687835/