我正在使用 hadoop map-reduce 来处理 XML 文件。我直接将JSON数据存储到mongodb中。
如何实现在执行BulkWriteOperation
之前只将不重复的记录存储到数据库中?
重复记录标准将基于产品图片和产品名称,我不想使用吗啡层,我们可以在其中为类成员分配索引.
这是我的 reducer 类:
public class XMLReducer extends Reducer<Text, MapWritable, Text, NullWritable>{
private static final Logger LOGGER = Logger.getLogger(XMLReducer.class);
protected void reduce(Text key, Iterable<MapWritable> values, Context ctx) throws IOException, InterruptedException{
LOGGER.info("reduce()------Start for key>"+key);
Map<String,String> insertProductInfo = new HashMap<String,String>();
try{
MongoClient mongoClient = new MongoClient("localhost", 27017);
DB db = mongoClient.getDB("test");
BulkWriteOperation operation = db.getCollection("product").initializeOrderedBulkOperation();
for (MapWritable entry : values) {
for (Entry<Writable, Writable> extractProductInfo : entry.entrySet()) {
insertProductInfo.put(extractProductInfo.getKey().toString(), extractProductInfo.getValue().toString());
}
if(!insertProductInfo.isEmpty()){
BasicDBObject basicDBObject = new BasicDBObject(insertProductInfo);
operation.insert(basicDBObject);
}
}
//How can I check for duplicates before executing bulk operation
operation.execute();
LOGGER.info("reduce------end for key"+key);
}catch(Exception e){
LOGGER.error("General Exception in XMLReducer",e);
}
}
}
编辑:在我添加的建议答案之后:
BasicDBObject query = new BasicDBObject("product_image", basicDBObject.get("product_image"))
.append("product_name", basicDBObject.get("product_name"));
operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicDBObject));
operation.insert(basicDBObject);
我收到如下错误:com.mongodb.MongoInternalException: no mapping found for index 0
任何帮助都会有用。谢谢。
最佳答案
我想这完全取决于您真正想对这里的“重复项”做什么以及如何处理它。
对于一个你总是可以使用.initializeUnOrderedBulkOperation()
这不会在您的索引中的重复键上“出错”(您需要停止重复),但会在返回的 BulkWriteResult
中报告任何此类错误目的。这是从 .execute()
BulkWriteResult result = operation.execute();
另一方面,您可以只使用“upserts”,并使用 $setOnInsert
等运算符仅在不存在重复项的情况下进行更改:
BasicDBObject basicdbobject = new BasicDBObject(insertProductInfo);
BasicDBObject query = new BasicDBObject("key", basicdbobject.get("key"));
operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicdbobject));
因此,您基本上是通过查询查找包含“键”的字段的值来确定重复项,然后实际上只更改未找到“键”的任何数据,因此更改新文档并“插入” .
无论哪种情况,此处的默认行为都是“插入”第一个唯一“键”值,然后忽略所有其他事件。如果你想在找到相同键的地方做其他事情,比如“覆盖”或“增加”值,那么 .update()
“upsert”方法就是你想要的,但你会使用其他update operators对于这些操作。
关于java - 使用 hadoop reducer 在将批量写入操作写入 mongodb 时检查重复记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31065905/