mongodb - 使用 MongoSpark 更新 mongoData

标签 mongodb apache-spark

来自Mongo提供的以下教程:

MongoSpark.save(centenarians.write.option("collection", "hundredClub").mode("overwrite"))

我是否正确理解实际上发生的事情是 Mongo 首先删除集合,然后用新数据覆盖该集合?

我的问题是,是否可以使用 MongoSpark 连接器实际更新 Mongo 中的记录,

假设我有看起来像这样的数据

{"_id" : ObjectId(12345), "name" : "John" , "Occupation" : "Baker"}

然后我想做的是从另一个包含更多详细信息的文件中合并此人的记录,即 I.E.该文件看起来像

{"name" : "John", "address" : "1800 some street"}

目标是更新 Mongo 中的记录,所以现在 JSON 看起来像

{"_id" : ObjectId(12345) "name" : "John" , "address" : 1800 some street", "Occupation" : "Baker"}

事情是这样的,假设我们只想更新 John,并且还有数百万条其他记录我们希望保持原样。

最佳答案

这里有几个问题,我将尝试分解它们。

What is essentially happening is that Mongo is first dropping the collection, and then its overwritting that collection with the new data?

正确,截至 mongo-spark v2.x,如果指定模式 overwriteMongoDB Connector for Spark将首先删除集合并将新结果保存到集合中。参见 source snippet了解更多信息。

My question is then is it possible to use the MongoSpark connector to actually update records in Mongo,

SPARK-66 中描述的补丁(mongo-spark v1.1+) 是,如果数据帧包含 _id 字段,数据将为 upserted .这意味着任何具有相同 _id 值的现有文档都将被更新,并且集合中没有现有 _id 值的新文档将被插入。

What I would then like to do is to merge the record of the person from another file that has more details

如上所述,您需要知道集合中的 _id 值。示例步骤:

  1. 通过读取 Person 集合来检索 John_id 值来创建数据框 (A)。即 ObjectId(12345)
  2. ObjectId(12345)_id 值合并到您的数据框中(B - 来自包含更多信息的其他文件)。利用唯一字段值连接两个数据帧(A 和 B)。
  3. 保存合并后的数据框 (C)。不指定 overwrite 模式。

we just want to update John, and that there are millions of other records that we would like to leave as is.

在这种情况下,在合并两个数据帧之前,从数据帧 B(来自另一个文件的具有更多详细信息的记录)中过滤掉任何不需要的记录。此外,当您调用 save() 时,指定模式 append

关于mongodb - 使用 MongoSpark 更新 mongoData,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41624850/

相关文章:

sql - 如何在 Spark SQL 中对分解字段进行 GROUP BY?

mongodb - Kubernetes/Rancher 2,带有本地存储卷部署的 mongo-replicaset

mongodb - Mongo 不均匀的分片分布

node.js - 如何将 MongoDB findOne() 与 mongoose 一起使用

python - 在 Spark 和 Python 中编写 flatMap 函数

apache-spark - Spark 结构化流恰好一次 - 未实现 - 重复事件

sorting - MongoDB 索引和自然排序优化

node.js - 如何将npm包中的node.js对象存储在mongodb中

apache-spark - UDF 原因警告 : CachedKafkaConsumer is not running in UninterruptibleThread (KAFKA-1894)

python - 从 S3 将嵌套文本文件读入 spark 时出现内存错误