apache-spark - 从 Spark RDD 读取 Kryo 文件

标签 apache-spark kryo

我是 Spark 和 Scala 新手。

我需要读取并分析 Spark 中的一个文件,该文件是用 Kryo 序列化在我的 scala 代码中编写的:

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.Output

val kryo:Kryo = new Kryo()
val output:Output = new Output(new FileOutputStream("filename.ext",true))

//kryo.writeObject(output, feed) (tested both line)
kryo.writeClassAndObject(output, myScalaObject)

这是一个伪代码,用于创建一个序列化我的对象(myScalaObject)的文件,这是一个复杂的对象。

该文件看起来写得很好,但是当我在 Spark RDD 中读取它时出现问题

Spark 中的伪代码:

val conf = new SparkConf()
    .setMaster("local")
    .setAppName("My application")
    .set("spark.executor.memory", "1g")


conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "myScalaObject")

val sc = new SparkContext(conf)

val file=sc.objectFile[myScalaObject]("filename.ext")

val counts = file.count()

当我尝试执行它时,我收到此错误:

org.apache.spark.SparkException: 
Job aborted: Task 0.0:0 failed 1 times (most recent failure: 
Exception failure: java.io.IOException: file: filename.ext not a SequenceFile)

可以在 Spark 中读取这种类型的文件吗?

如果这个解决方案不可能,那么创建复杂文件结构以在 Spark 中读取的好解决方案是什么?

谢谢

最佳答案

如果要使用objectFile读取,请使用saveAsObjectFile写出数据。

val myObjects: Seq[MyObject] = ...
val rddToSave = sc.parallelize(myObjects) // Or better yet: construct as RDD from the start.
rddToSave.saveAsObjectFile("/tmp/x")
val rddLoaded = sc.objectFile[MyObject]("/tmp/x")

或者,正如 zsxwing 所说,您可以创建文件名的 RDD,并使用 map 读取每个文件的内容。如果希望将每个文件读入单独的分区,请将文件名并行化到单独的分区中:

def loadFiles(filenames: Seq[String]): RDD[Object] = {
  def load(filename: String): Object = {
    val input = new Input(new FileInputStream(filename))
    return kryo.readClassAndObject(input)
  }
  val partitions = filenames.length
  return sc.parallelize(filenames, partitions).map(load)
}

关于apache-spark - 从 Spark RDD 读取 Kryo 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23617783/

相关文章:

python - Pyspark - 将 json 字符串转换为 DataFrame

java - Spark - 使用不可序列化的成员序列化对象

Java Kryonet [类未注册异常]

python - (Py)Spark中如何使用JDBC源读写数据?

java - 如何在 Spark Java 中使用 StructType Schema 从 JavaRDD<String> 读取 csv 格式数据

java - KryoNet:客户端连接后立即断开连接

java - Kryonet RMI 抛出异常 => 循环 (StackOverflowError)

java - Kryo 序列化程序在底层 Scala 类 WrappedArray 上导致异常

java - apache Spark服务器的哪个节点从磁盘读取节点

python - 在 pyspark 中不使用 pivot 进行分组的有效方法