java - 在 Spark 中读取自定义序列文件

标签 java scala apache-spark hadoop

我在 Hadoop 中有一个自定义可写类,它保存为序列文件,如下所示

   public class ABC implements Writable{
    private byte[] myId;
    private byte[] myType;

    //Constructor and other methods
    @Override
    public void write(DataOutput out) throws IOException {
        myId.write(out);
        myType.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        myId.readFields(in);
        myType.readFields(in);
    }
}
我想使用 PySpark 读取序列文件并获取数据。我尝试了以下三种方法:
  • 直接阅读:

  • sc.sequenceFile("file:///Test.seq", keyClass = "ABC", valueClass ="ABC")
    但得到
    object not serializable (class: ABC, value: ABC@451de3ec)
    
  • 写转换器:

  • 来自官方教程http://spark.apache.org/docs/latest/programming-guide.html#external-datasets ,它说

    If you have custom serialized binary data (such as loading data from Cassandra / HBase), then you will first need to transform that data on the Scala/Java side to something which can be handled by Pyrolite’s pickler. A Converter trait is provided for this. Simply extend this trait and implement your transformation code in the convert method.


    因此,我实现了一个转换器,如下所示:
    import test.ABC
    import java.io.DataInput
    import org.apache.spark.api.python.Converter
    
    /**
     * Implementation of [[org.apache.spark.api.python.Converter]] that converts data
     * to ABC
     */
    class DataToABCConverter extends Converter[Any, ABC] {
      override def convert(obj: Any): ABC = {
        if (obj == null) {
          return null
        }
        val in = obj.asInstanceOf[DataInput]
        val abc = new ABC()
        abc.readFields(in)
        abc
      }
    }
    
    在 PySpark 我使用以下代码
    sc.sequenceFile("file:///Test.seq", keyClass = "ABC", valueClass ="ABC",  keyConverter="DataToABCConverter",  valueConverter="DataToABCConverter" )
    
    但是得到以下错误
    java.lang.ClassCastException: ABC cannot be cast to java.io.DataInput
    
    看起来转换器的输入是我的 ABC 类而不是 java.io.DataInput,所以我不能应用 readFields 方法来获取数据。
  • 使用 BytesWritable:

  • 我添加一个 geID()获取字节并更改转换器的方法如下:
    class DataToChunkConverter extends Converter[Any, BytesWritable] {
      override def convert(obj: Any): BytesWritable = {
        if (obj == null) {
          return null
        }
        val abc = obj.asInstanceOf[ABC]
        val idd = abc.getID()
        new BytesWritable(idd)
      }
    }
    
    比我运行 pyspark 使用
    pyspark --master=local[8] --conf "spark.kryo.classesToRegister=org.apache.hadoop.io.BytesWritable" --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
    
    但是得到以下错误
    Failed to pickle Java object as value: BytesWritable, falling back
    to 'toString'. Error: couldn't pickle object of type class org.apache.hadoop.io.BytesWritable
    
    所以我的问题是在 PySpark 中读取自定义序列文件的正确方法是什么?什么类型可以通过 PySpark 序列化?任何建议表示赞赏!

    最佳答案

    经过一些实验(遵循第三种方法),事实证明如果使用scala或Java中的native类型作为转换器的返回类型,它就可以工作。

    例如,使用 Array[Byte]作为返回类型,Pyspark 可以成功获取数据:

     class DataToChunkConverter extends Converter[Any,  Array[Byte]] {
      override def convert(obj: Any):  Array[Byte] = {
        if (obj == null) {
          return null
        }
        val abc = obj.asInstanceOf[ABC] 
        val idd = abc.getID()
        idd
      }
    }
    

    关于java - 在 Spark 中读取自定义序列文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42687835/

    相关文章:

    java - 类不是抽象的,不会重写 KeyListener 中的抽象方法 keyReleased(KeyEvent)

    scala - 如何使用 Spark 从 svd 分量重建原始矩阵

    scala - 在 Windows 10 上安装 Spark 后出现问题

    python - 如何将 RDD 保存到单个 Parquet 文件?

    scala - 使用 spark 将 null 设置为 Hive 表中数字数据类型的值

    hadoop - 最小化Google Dataproc上Apache Spark作业的初始化时间的最佳方法是什么?

    java - 为 Java JDK 11 中可用的 TLS 1.3 实现 session 恢复

    Java 异常 java.lang.IllegalThreadStateException

    java - 两个静态整数相除始终返回 0% 比率

    java - 构建成功时出现奇怪的 Maven 错误消息