Spring Data Mongodb - 如何进行批量更新插入

标签 spring mongodb spring-boot spring-data-mongodb bulkupdate

我有一个相同集合的对象/域列表,如果 Mongodb 数据库中不存在,则应将其插入,否则应按 _id 更新现有记录过滤器。

虽然这可以使用 Spring data MongoRepositories 来完成,但它似乎是:

  1. 太慢了!!!(可能是一条一条保存记录!!!这个操作是Bulk执行的吗???)
  2. 如果有任何DuplicateKeyException,它会终止执行(不保存下一条记录,尽管我想忽略此类异常)

现在,如果我使用 BulkOperationsbulkOps = mongoTemplate.bulkOps(BulkMode.UNORDERED, collectionName),如果完整性破坏,它仍然会抛出相同的异常! 尽管根据 Spring 文档,使用 BulkMode.UNORDERED:

Perform bulk operations in parallel. Processing will continue on errors.

考虑下面的代码:

BulkOperations bulkOps = mongoTemplate.bulkOps(BulkMode.UNORDERED, className);

        for (T entry : entries) {
            DBObject dbDoc = new BasicDBObject();
            mongoTemplate.getConverter().write(entry, dbDoc);
            Update update = Update.fromDBObject(dbDoc);
            bulkOps.updateOne(new Query(Criteria.where("_id").is(dbDoc.get("_id"))), update);
        }
        BulkWriteResult result = bulkOps.execute();

当我执行上面的代码时,出现以下异常:

java.lang.IllegalArgumentException: Invalid BSON field name _class
at org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:516)
at com.mongodb.DBObjectCodec.encode(DBObjectCodec.java:129)
at com.mongodb.DBObjectCodec.encode(DBObjectCodec.java:61)
at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:63)
at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:29)
at com.mongodb.connection.UpdateCommandMessage.writeTheWrites(UpdateCommandMessage.java:85)
at com.mongodb.connection.UpdateCommandMessage.writeTheWrites(UpdateCommandMessage.java:43)
at com.mongodb.connection.BaseWriteCommandMessage.encodeMessageBodyWithMetadata(BaseWriteCommandMessage.java:129)
at com.mongodb.connection.RequestMessage.encodeWithMetadata(RequestMessage.java:160)
at com.mongodb.connection.WriteCommandProtocol.sendMessage(WriteCommandProtocol.java:220)
at com.mongodb.connection.WriteCommandProtocol.execute(WriteCommandProtocol.java:101)
at com.mongodb.connection.UpdateCommandProtocol.execute(UpdateCommandProtocol.java:64)
at com.mongodb.connection.UpdateCommandProtocol.execute(UpdateCommandProtocol.java:37)
at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:168)
at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:289)
at com.mongodb.connection.DefaultServerConnection.updateCommand(DefaultServerConnection.java:143)
at com.mongodb.operation.MixedBulkWriteOperation$Run$3.executeWriteCommandProtocol(MixedBulkWriteOperation.java:490)
at com.mongodb.operation.MixedBulkWriteOperation$Run$RunExecutor.execute(MixedBulkWriteOperation.java:656)
at com.mongodb.operation.MixedBulkWriteOperation$Run.execute(MixedBulkWriteOperation.java:409)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:177)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:168)
at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:426)
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:417)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:168)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:74)
at com.mongodb.Mongo.execute(Mongo.java:845)
at com.mongodb.Mongo$2.execute(Mongo.java:828)
at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:2309)
at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:2302)
at com.mongodb.BulkWriteOperation.execute(BulkWriteOperation.java:121)
at org.springframework.data.mongodb.core.DefaultBulkOperations.execute(DefaultBulkOperations.java:278)
at com.nayapay.biller.service.UtilityService.saveInBulk(UtilityService.java:122)
at com.nayapay.biller.service.xxx.xxx(xxx.java:447)
at com.nayapay.biller.service.xxx.xxx(xxx.java:172)
at com.nayapay.biller.controller.xxx.updateBatch(xxx.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:967)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:901)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:872)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:661)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:317)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:127)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:91)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:114)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:170)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:64)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:331)
at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:214)
at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:177)
at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346)
at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:478)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:80)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:799)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1457)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)


at java.lang.Thread.run(Unknown Source)
[ERROR] 2017-11-16 10:52:26.904 [http-nio-127.0.0.1-8084-exec-10] ExceptionController - Error occured: Invalid BSON field name _class

如果我排除_id和/或_class,其他字段也会发生同样的异常!!

所以我的问题是:

  1. MongoRepository 的 save(Iteratable list) 方法是否批量执行?
  2. 如何批量更新插入列表(无需查看文档是否已存在)
  3. 如何忽略数据完整性异常,例如 DuplicateKeyException

注意:文档的ID是由应用程序设置的,而不是由数据库设置的(将有助于过滤目的)

最佳答案

你可以尝试一下,希望它会起作用!

final BulkWriteOperation bulkOps = mongoTemplate.getCollection(className).initializeUnorderedBulkOperation();

        entries.stream().filter(entry -> entry != null).forEach(entry -> {
            DBObject dbDoc = new BasicDBObject();
            mongoTemplate.getConverter().write(entry, dbDoc);
            dbDoc.removeField("_id");
            BulkWriteRequestBuilder bulkWriteRequestBuilder = bulkOps.find(criteria.apply(dbDoc).getCriteriaObject());
            BulkUpdateRequestBuilder upsertReq = bulkWriteRequestBuilder.upsert();
            upsertReq.replaceOne(dbDoc);
        });

        BulkWriteResult result = bulkOps.execute();

关于Spring Data Mongodb - 如何进行批量更新插入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47322686/

相关文章:

mongodb - 在 MongoDB 聚合框架中将列合并为一个

java - 我如何知道 ArrayList 是否包含 mongoDB 中的特定项目?

spring - 使用 Swagger/OpenAPI 文档模拟 Rest API

java - 查询从 mongoDB 获取结果

Spring Boot 和 JSF/Primefaces/Richfaces

java - 如何知道 Spring Boot 中的 webjars 路径。如何获取 webjars 中所有可用文件的路径

java - webflux spring boot 应用程序的过滤器,返回 ResponseEntity<?>

java - SpringBoot v2.7.0 - Select-Query without transactional-annotation => No transactional EntityManager available |现在有了 github-example

php - mongodb从不同的数据库中选择

java - 如何更改本地主机 :8080 to www. myWebApp.com?