apache-spark - 从 spark-shell 中的分布式操作调用 `JValue.extract` 时出错

标签 apache-spark json4s

我正在尝试在 Spark 中使用 json4s 的案例类提取功能,
即调用jvalue.extract[MyCaseClass] .如果我带上 JValue 就可以了对象进入主人并在那里进行提取,但同样的调用在 worker 中失败:

import org.json4s._
import org.json4s.jackson.JsonMethods._
import scala.util.{Try, Success, Failure}

val sqx = sqlContext

val data = sc.textFile(inpath).coalesce(2000)

case class PageView(
 client:  Option[String]
)

def extract(json: JValue) = {
  implicit def formats = org.json4s.DefaultFormats
  Try(json.extract[PageView]).toOption
}

val json = data.map(parse(_)).sample(false, 1e-6).cache()

// count initial inputs
val raw = json.count 


// count successful extractions locally -- same value as above
val loc = json.toLocalIterator.flatMap(extract).size

// distributed count -- always zero
val dist = json.flatMap(extract).count // always returns zero

// this throws  "org.json4s.package$MappingException: Parsed JSON values do not match with class constructor"
json.map(x => {implicit def formats = org.json4s.DefaultFormats; x.extract[PageView]}).count
Formats 的隐含在 extract 中本地定义函数,因为 DefaultFormats 不可序列化并且在顶层定义它导致它被序列化以传输给工作人员而不是在那里构造。我认为问题仍然与DefaultFormats的远程初始化有关。 ,但我不确定它是什么。

当我调用 extract直接方法,我的extract函数,就像在上一个示例中一样,它不再提示序列化,而只是抛出 JSON 与预期结构不匹配的错误。

当分配给 worker 时,我怎样才能让提取工作?

编辑

@WesleyMiao 重现了这个问题,发现它是 spark-shell 特有的。他报告说此代码作为独立应用程序工作。

最佳答案

在 spark-shell 中运行代码时,我遇到了与您相同的异常。但是,当我将您的代码变成真正的 spark 应用程序并将其提交到独立的 spark 集群时,我毫无异常(exception)地得到了预期的结果。

下面是我在一个简单的 spark 应用程序中输入的代码。

val data = sc.parallelize(Seq("""{"client":"Michael"}""", """{"client":"Wesley"}"""))

val json = data.map(parse(_))

val dist = json.mapPartitions { jsons =>
  implicit val formats = org.json4s.DefaultFormats
  jsons.map(_.extract[PageView])
}

dist.collect() foreach println

当我使用 spark-submit 运行它时,我得到了以下结果。
PageView(Some(Michael))                                                                                                                                       
PageView(Some(Wesley))

而且我也确信它不是在“本地[*]”模式下运行的。

现在我怀疑我们在 spark-shell 中运行时出现异常的原因与 spark-shell 中的案例类 PageView 定义以及 spark-shell 如何将其序列化/分发给执行程序有关。

关于apache-spark - 从 spark-shell 中的分布式操作调用 `JValue.extract` 时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30744294/

相关文章:

scala - reduceBykey Spark 维护秩序

Scala json4s 密封特征作为枚举

json - 检查对象在 json4s/lift-json 中是否有字段

json - 如何使用json4s从json数组解析和提取信息

scala - 使用 json4s 序列化带有特征混合的案例类

mongodb - ReactiveMongo 和 JSON4S

scala - 从 Kafka 主题读取数据并使用 scala 和 spark 写回 Kafka 主题

apache-spark - 如何修复Windows上的spark-shell(失败并显示 "was unexpected at this time")?

apache-spark - Spark Redshift 连接器 : combine saving to redshift with a delete query

sql - Spark SQL 复杂条件窗口函数