java - 使用 hadoop reducer 在将批量写入操作写入 mongodb 时检查重复记录

标签 java mongodb hadoop mapreduce bulkinsert

我正在使用 hadoop map-reduce 来处理 XML 文件。我直接将JSON数据存储到mongodb中。
如何实现在执行BulkWriteOperation之前只将不重复的记录存储到数据库中?

重复记录标准将基于产品图片和产品名称,我不想使用吗啡层,我们可以在其中为类成员分配索引.

这是我的 reducer 类:

public class XMLReducer extends Reducer<Text, MapWritable, Text, NullWritable>{

private static final Logger LOGGER = Logger.getLogger(XMLReducer.class);    

protected void reduce(Text key, Iterable<MapWritable> values, Context ctx) throws IOException, InterruptedException{
    LOGGER.info("reduce()------Start for key>"+key);
    Map<String,String> insertProductInfo = new HashMap<String,String>();
    try{
        MongoClient mongoClient = new MongoClient("localhost", 27017);
        DB db = mongoClient.getDB("test");
        BulkWriteOperation operation = db.getCollection("product").initializeOrderedBulkOperation();
        for (MapWritable entry : values) {
             for (Entry<Writable, Writable> extractProductInfo : entry.entrySet()) {
                    insertProductInfo.put(extractProductInfo.getKey().toString(), extractProductInfo.getValue().toString());
                }
             if(!insertProductInfo.isEmpty()){
                 BasicDBObject basicDBObject = new BasicDBObject(insertProductInfo);
                 operation.insert(basicDBObject);
             }          
        }
        //How can I check for duplicates before executing bulk operation
        operation.execute();
        LOGGER.info("reduce------end for key"+key);
    }catch(Exception e){
        LOGGER.error("General Exception in XMLReducer",e);
    }
  } 
}

编辑:在我添加的建议答案之后:

 BasicDBObject query = new BasicDBObject("product_image", basicDBObject.get("product_image"))
                 .append("product_name", basicDBObject.get("product_name"));
                 operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicDBObject));
 operation.insert(basicDBObject);

我收到如下错误:com.mongodb.MongoInternalException: no mapping found for index 0

任何帮助都会有用。谢谢。

最佳答案

我想这完全取决于您真正想对这里的“重复项”做什么以及如何处理它。

对于一个你总是可以使用.initializeUnOrderedBulkOperation()这不会在您的索引中的重复键上“出错”(您需要停止重复),但会在返回的 BulkWriteResult 中报告任何此类错误目的。这是从 .execute()

返回的
BulkWriteResult result = operation.execute();

另一方面,您可以只使用“upserts”,并使用 $setOnInsert 等运算符仅在不存在重复项的情况下进行更改:

BasicDBObject basicdbobject = new BasicDBObject(insertProductInfo);
BasicDBObject query = new BasicDBObject("key", basicdbobject.get("key"));

operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicdbobject));

因此,您基本上是通过查询查找包含“键”的字段的值来确定重复项,然后实际上只更改未找到“键”的任何数据,因此更改新文档并“插入” .

无论哪种情况,此处的默认行为都是“插入”第一个唯一“键”值,然后忽略所有其他事件。如果你想在找到相同键的地方做其他事情,比如“覆盖”或“增加”值,那么 .update() “upsert”方法就是你想要的,但你会使用其他update operators对于这些操作。

关于java - 使用 hadoop reducer 在将批量写入操作写入 mongodb 时检查重复记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31065905/

相关文章:

MongoDB + Laravel + jenssegers/laravel-mongodb + 更新嵌套子元素

hadoop - S3NativeFileSystem 调用是否会终止我在 AWS EMR 4.6.0 上的 Pyspark 应用程序

scala - 将 S3(法兰克福)与 Spark 结合使用

java - 将 double 值舍入为 2 个小数位

java - 如果 is=false,Java 是否浪费时间检查 "if(isOK && conditionA)"中的 conditionS?

java - 如何在Java中访问JSP隐式对象,例如(请求,响应)

hadoop - elasticsearch 2.3.3 中的日期问题

java - 为什么这种方法不起作用?

c# - MongoDB C# 驱动程序 : connection string for sharding over replica set

使用 Gson 将 Java 日期转为 MongoDB 日期