java - 关闭 Mongodb Changestream 导致异常

标签 java mongodb changestream

这个简单的示例会导致异常

public static void main(String[] args) throws InterruptedException {

    MongoClient mongoClient = MongoClients.create("mongodb://localhost");
    MongoDatabase db = mongoClient.getDatabase("import");
    MongoCollection<Document> gameEntityCollection = db.getCollection("gameEnitites");
    gameEntityCollection.watch().subscribe(new Subscriber<ChangeStreamDocument<Document>>() {

        @Override
        public void onSubscribe(Subscription s) {
            s.request(1);
            new Thread(() -> {
                try {
                    Thread.sleep(100);
                    s.cancel();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        @Override
        public void onNext(ChangeStreamDocument<Document> t) {
        }

        @Override
        public void onError(Throwable t) {
        }

        @Override
        public void onComplete() {
        }
    });

    Thread.sleep(10000);
}

该异常似乎是无害的,并且是异步记录的。

com.mongodb.MongoException: state should be: open
    at com.mongodb.MongoException.fromThrowableNonNull(MongoException.java:79) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.async.client.AbstractSubscription.onError(AbstractSubscription.java:135) ~[mongodb-driver-async-3.11.1.jar:na]
    at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:93) ~[mongodb-driver-async-3.11.1.jar:na]
    at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:85) ~[mongodb-driver-async-3.11.1.jar:na]
    at com.mongodb.operation.AsyncChangeStreamBatchCursor$3.onResult(AsyncChangeStreamBatchCursor.java:133) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.operation.AsyncChangeStreamBatchCursor$3.onResult(AsyncChangeStreamBatchCursor.java:129) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.operation.AsyncChangeStreamBatchCursor$4.onResult(AsyncChangeStreamBatchCursor.java:168) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.operation.AsyncChangeStreamBatchCursor$4.onResult(AsyncChangeStreamBatchCursor.java:159) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:331) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:310) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:242) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:83) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:467) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:401) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:376) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:677) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:644) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203) ~[mongodb-driver-core-3.11.1.jar:na]
    at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127) ~[na:na]
    at java.base/sun.nio.ch.Invoker$2.run(Invoker.java:219) ~[na:na]
    at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]
Caused by: java.lang.IllegalStateException: state should be: open
    at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.session.BaseClientSessionImpl.advanceOperationTime(BaseClientSessionImpl.java:107) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.session.ClientSessionContext.advanceOperationTime(ClientSessionContext.java:70) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.ClusterClockAdvancingSessionContext.advanceOperationTime(ClusterClockAdvancingSessionContext.java:76) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.updateSessionContext(InternalStreamConnection.java:537) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.access$800(InternalStreamConnection.java:76) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:385) ~[mongodb-driver-core-3.11.1.jar:na]
    ... 13 common frames omitted

我能做些什么呢?

最佳答案

我对 scala 代码也有类似的问题,并花了一些时间进行调查。结果,我得出的结论是,这是驱动程序中的一个错误。

在 github ( https://github.com/eugeneatnezasa/unclosableMongoDBChangeStream ) 存储库中,您可以找到如何解决此问题的示例,但它是在 scala 中。想法是,驱动程序将收集并发送请求数量的记录(s.request(1),在您的情况下为 1),在此之前忽略cancel。因此,要解决此问题,您需要使用所有请求的记录。很奇怪,但至少你不会遇到异常......

关于java - 关闭 Mongodb Changestream 导致异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58468428/

相关文章:

java - 在 Kafka Streams 应用程序中,是否有一种方法可以使用输出主题的通配符列表来定义拓扑?

java - 如何从 XPath 中子节点的值节点获取父文本值?

java - 单击按钮后如何使用异步任务调用方法

python - 获取查询返回的文档数

python - MongoDB 插入速度权衡

java - 收到 404 running spring boot

python - Pymongo 匹配和排序

node.js - 客户应该为每个集合或每个文档制作一个变更流吗?

MongoDB 更改流在插入时返回空 fullDocument

MongoDB 更改流 : Can I get value before update/delete?