mongodb - 将 mongodb oplog.rs 加载到 spark dataframe

标签 mongodb scala apache-spark spark-dataframe

我正在尝试将 MongoDB 中的 oplog.rs 加载到 spark DataFrame 中,它加载了元数据并通过 printSchema 函数对其进行了验证,但是当我尝试执行诸如 show 或 count 之类的操作时它给了我这个错误 scala.MatchError: ((BsonMinKey,null),0) (of class scala.Tuple2)。我也尝试将其注册为 temptable,但仍然出现相同的错误。

val customReadConfig = ReadConfig(Map(
  "uri" -> 
    "mongodb://username:password@host_name:port/local.oplog.rs?authSource=xxxxx"
))

val dataframe = sqlContext.read.format("com.mongodb.spark.sql").
  options(customReadConfig.asOptions).load

最佳答案

为了后代:

Mongo >= 3.2 版本的默认分区器是 MongoSamplePartitioner,它使用(像所有其他分区器一样)partitionKey 并且在创建分区时使用BsonMinKeyBsonMaxKey 来定义每个分区的边界。您遇到的匹配错误可能发生在此处:

  def createPartitions(partitionKey: String, splitKeys: Seq[BsonValue], 
      locations: Seq[String] = Nil, addMinMax: Boolean = true): 
      Array[MongoPartition] = {
        val minKeyMaxKeys = (new BsonMinKey(), new BsonMaxKey())
        val minToMaxSplitKeys: Seq[BsonValue] = if (addMinMax) minKeyMaxKeys._1 +: splitKeys :+ minKeyMaxKeys._2 else splitKeys
        val minToMaxKeysToPartition = if (minToMaxSplitKeys.length == 1) minToMaxSplitKeys else minToMaxSplitKeys.tail
        val partitionPairs: Seq[(BsonValue, BsonValue)] = minToMaxSplitKeys zip minToMaxKeysToPartition
        partitionPairs.zipWithIndex.map({
           case ((min: BsonValue, max: BsonValue), i: Int) => MongoPartition(i, createBoundaryQuery(partitionKey, min, max), locations)
      }).toArray
}

该错误告诉您的是您的 max 被设置为 null,正如您在代码中看到的那样,只处理了一种情况。如果您没有设置要使用的partitionKey,分区程序将默认使用_id,您可以阅读它here

默认情况下,oplog.rs 集合没有_id 键,oplog 记录的唯一id 是惊人的h ,它是一个数字。因此,为了让分区程序做正确的事情,您需要在 SparkConfReadConfig 中设置 spark.mongodb.input.partitionerOptions.partitionKeyh

 new SparkConf()
   //all of your other settings
   .set("spark.mongodb.input.partitionerOptions.partitionKey", "h")

关于mongodb - 将 mongodb oplog.rs 加载到 spark dataframe,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42584984/

相关文章:

scala - 这个功能可以用 Haskell 的类型系统实现吗?

python - 为什么 DataFrame 中缺少分区键列

docker - kubernetes上的 Spark :预期为HTTP 101响应,但为 '403 Forbidden'

apache-spark - Apache Sqoop 和 Spark

javascript - 使用node.js异步系列插入错误

python - PyMongo - 将属性中的所有值设置为小写

mongodb - 带 lift-json 的默认案例类参数

scala - 如何使用 spark-shell 导入自己的 scala 包?

mongodb - 如何在 MongoDB 集合中查找与给定条件匹配的文档和单个子文档

python - MongoDB 和 Meteor 中隐私设置的理想模式设计