我正在尝试将 MongoDB 中的 oplog.rs 加载到 spark DataFrame 中,它加载了元数据并通过 printSchema
函数对其进行了验证,但是当我尝试执行诸如 show 或 count 之类的操作时它给了我这个错误 scala.MatchError: ((BsonMinKey,null),0) (of class scala.Tuple2)
。我也尝试将其注册为 temptable,但仍然出现相同的错误。
val customReadConfig = ReadConfig(Map(
"uri" ->
"mongodb://username:password@host_name:port/local.oplog.rs?authSource=xxxxx"
))
val dataframe = sqlContext.read.format("com.mongodb.spark.sql").
options(customReadConfig.asOptions).load
最佳答案
为了后代:
Mongo >= 3.2 版本的默认分区器是 MongoSamplePartitioner
,它使用(像所有其他分区器一样)partitionKey
并且在创建分区时使用BsonMinKey
和 BsonMaxKey
来定义每个分区的边界。您遇到的匹配错误可能发生在此处:
def createPartitions(partitionKey: String, splitKeys: Seq[BsonValue],
locations: Seq[String] = Nil, addMinMax: Boolean = true):
Array[MongoPartition] = {
val minKeyMaxKeys = (new BsonMinKey(), new BsonMaxKey())
val minToMaxSplitKeys: Seq[BsonValue] = if (addMinMax) minKeyMaxKeys._1 +: splitKeys :+ minKeyMaxKeys._2 else splitKeys
val minToMaxKeysToPartition = if (minToMaxSplitKeys.length == 1) minToMaxSplitKeys else minToMaxSplitKeys.tail
val partitionPairs: Seq[(BsonValue, BsonValue)] = minToMaxSplitKeys zip minToMaxKeysToPartition
partitionPairs.zipWithIndex.map({
case ((min: BsonValue, max: BsonValue), i: Int) => MongoPartition(i, createBoundaryQuery(partitionKey, min, max), locations)
}).toArray
}
该错误告诉您的是您的 max
被设置为 null,正如您在代码中看到的那样,只处理了一种情况。如果您没有设置要使用的partitionKey
,分区程序将默认使用_id
,您可以阅读它here
默认情况下,oplog.rs
集合没有_id
键,oplog 记录的唯一id 是惊人的h
,它是一个数字。因此,为了让分区程序做正确的事情,您需要在 SparkConf
或 ReadConfig
中设置 spark.mongodb.input.partitionerOptions.partitionKey
到 h
。
new SparkConf()
//all of your other settings
.set("spark.mongodb.input.partitionerOptions.partitionKey", "h")
关于mongodb - 将 mongodb oplog.rs 加载到 spark dataframe,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42584984/