尝试使用 MongoDB BulkWriteOperation 时出现 java.lang.IllegalStateException

标签 java mongodb exception queue

我有这段代码,一旦 ArrayBlockingQueue 填满它的配额,它就会将文档转储到 MongoDB 中。当我运行代码时,它似乎只运行一次,然后给我一个堆栈跟踪。我的猜测是 BulkWriteOperation 必须“重置”或重新开始。

此外,我在构造函数中创建了 BulkWriteOperations...

bulkEvent = eventsCollection.initializeOrderedBulkOperation();
bulkSession = sessionsCollection.initializeOrderedBulkOperation();

这是堆栈跟踪。

10 records inserted
java.lang.IllegalStateException: already executed
    at org.bson.util.Assertions.isTrue(Assertions.java:36)
    at com.mongodb.BulkWriteOperation.insert(BulkWriteOperation.java:62)
    at willkara.monkai.impl.managers.DataManagers.MongoDBManager.dumpQueue(MongoDBManager.java:104)
    at willkara.monkai.impl.managers.DataManagers.MongoDBManager.addToQueue(MongoDBManager.java:85)

这是队列的代码:

public void addToQueue(Object item) {
        if (item instanceof SakaiEvent) {
            if (eventQueue.offer((SakaiEvent) item)) {
            } else {
                dumpQueue(eventQueue);
            }

        }
        if (item instanceof SakaiSession) {
            if (sessionQueue.offer((SakaiSession) item)) {
            } else {
                dumpQueue(sessionQueue);
            }
        }

    }

下面是从队列中读取并将它们添加到 BulkWriteOperation (initializeOrderedBulkOperation) 以执行它然后将其转储到数据库的代码。只写入了 10 个文档,然后就失败了。

private void dumpQueue(BlockingQueue q) {
        Object item = q.peek();
        Iterator itty = q.iterator();
        BulkWriteResult result = null;

        if (item instanceof SakaiEvent) {
            while (itty.hasNext()) {
                bulkEvent.insert(((SakaiEvent) itty.next()).convertToDBObject());
                //It's failing at that line^^
            }
            result = bulkEvent.execute();

        }
        if (item instanceof SakaiSession) {
            while (itty.hasNext()) {
                bulkSession.insert(((SakaiSession) itty.next()).convertToDBObject());
            }
            result = bulkSession.execute();
        }

        System.out.println(result.getInsertedCount() + " records inserted");
    }

最佳答案

general documentation适用于这种情况下的所有驱动程序实现:

"After execution, you cannot re-execute the Bulk() object without reinitializing."

因此 .execute() 方法有效地“耗尽”了当前发送给它的操作列表,现在包含有关命令实际发送方式的状态信息。因此,您不能在不重新初始化的情况下在同一实例上添加更多条目或再次调用 .execute()

因此在对每个“批量”对象调用执行之后,您需要再次调用初始化:

bulkEvent = eventsCollection.initializeOrderedBulkOperation();
bulkSession = sessionsCollection.initializeOrderedBulkOperation();

这些行中的每一行都在您的函数中的每个 .execute() 调用之后分别重新放置。然后进一步调用这些实例可以添加操作并再次调用执行继续循环。

请注意,“批量”操作对象将存储您想要放入其中的项目,但会将对服务器的请求分解为最多 1000 个项目。执行后,操作列表的状态将准确反射(reflect)这是如何完成的,如果您想检查它的话。

关于尝试使用 MongoDB BulkWriteOperation 时出现 java.lang.IllegalStateException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25814729/

相关文章:

java - log4j - 写入/记录到多个日志文件取决于应用程序

java - 与 mongo db 不同的值?

javascript - 如何在 mongodb 的 angularjs 中使用变量模式

java - 模拟 toURI() 方法时出现 AssertionFailedError

java - 如何计算嵌套for循环的时间复杂度?

java - 在 Java 中从 ArrayList 打印不同的对象数据

java - 谷歌云数据流: Submitted job is executing but using old code

mongodb - 根据其他字段的值投影数组中存在的特定字段

c++ - what() 未打印的重新抛出异常的自定义错误消息

php - Controller 能否捕获模型抛出的异常?