我正在尝试在 Spark 中使用 json4s 的案例类提取功能,
即调用jvalue.extract[MyCaseClass]
.如果我带上 JValue
就可以了对象进入主人并在那里进行提取,但同样的调用在 worker 中失败:
import org.json4s._
import org.json4s.jackson.JsonMethods._
import scala.util.{Try, Success, Failure}
val sqx = sqlContext
val data = sc.textFile(inpath).coalesce(2000)
case class PageView(
client: Option[String]
)
def extract(json: JValue) = {
implicit def formats = org.json4s.DefaultFormats
Try(json.extract[PageView]).toOption
}
val json = data.map(parse(_)).sample(false, 1e-6).cache()
// count initial inputs
val raw = json.count
// count successful extractions locally -- same value as above
val loc = json.toLocalIterator.flatMap(extract).size
// distributed count -- always zero
val dist = json.flatMap(extract).count // always returns zero
// this throws "org.json4s.package$MappingException: Parsed JSON values do not match with class constructor"
json.map(x => {implicit def formats = org.json4s.DefaultFormats; x.extract[PageView]}).count
Formats
的隐含在 extract
中本地定义函数,因为 DefaultFormats 不可序列化并且在顶层定义它导致它被序列化以传输给工作人员而不是在那里构造。我认为问题仍然与DefaultFormats
的远程初始化有关。 ,但我不确定它是什么。当我调用
extract
直接方法,我的extract
函数,就像在上一个示例中一样,它不再提示序列化,而只是抛出 JSON 与预期结构不匹配的错误。当分配给 worker 时,我怎样才能让提取工作?
编辑
@WesleyMiao 重现了这个问题,发现它是 spark-shell 特有的。他报告说此代码作为独立应用程序工作。
最佳答案
在 spark-shell 中运行代码时,我遇到了与您相同的异常。但是,当我将您的代码变成真正的 spark 应用程序并将其提交到独立的 spark 集群时,我毫无异常(exception)地得到了预期的结果。
下面是我在一个简单的 spark 应用程序中输入的代码。
val data = sc.parallelize(Seq("""{"client":"Michael"}""", """{"client":"Wesley"}"""))
val json = data.map(parse(_))
val dist = json.mapPartitions { jsons =>
implicit val formats = org.json4s.DefaultFormats
jsons.map(_.extract[PageView])
}
dist.collect() foreach println
当我使用 spark-submit 运行它时,我得到了以下结果。
PageView(Some(Michael))
PageView(Some(Wesley))
而且我也确信它不是在“本地[*]”模式下运行的。
现在我怀疑我们在 spark-shell 中运行时出现异常的原因与 spark-shell 中的案例类 PageView 定义以及 spark-shell 如何将其序列化/分发给执行程序有关。
关于apache-spark - 从 spark-shell 中的分布式操作调用 `JValue.extract` 时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30744294/