Java 异步 MongoDB 驱动程序和 RxJava2 Observables

标签 java mongodb observable rx-java2 asyncmongo

我正在研究 RxJava2 的响应式(Reactive)编程,我对它与 MongoDB 等异步数据库驱动程序的使用有疑问。

如果我使用阻塞 MongoDB 驱动程序来获取集合,方法将是这样的:

public class MyDao{
   ...
   public Document getFirstDocument(String collectionName){
      MongoCollection<Document> collection = database.getCollection(collectionName);
      return collection.find().first();
   }
}



public class MyService {
   ...
   public Observable<Document> getFirstOf(String collectionName){
       return Observable.just(myDao.getFirstDocument(collectionName)); 
   }
}

相反,使用 MongoDB 的异步驱动程序时,我的读取操作的返回类型是一个 void(而不是 Document 或 Future),其中包含回调方法,例如:

collection.find().first(
        (document, throwable) -> {
            myService.myCallback(document);
        }
);

那么,我如何将可观察文档传递给 MyService?

public class MyDao{
   ...
   public void getFirstDocument(String collectionName){
      MongoCollection<Document> collection = database.getCollection(collectionName);
      collection.find().first(
        (document, throwable) -> {
            //SOME SORT OF CALLBACK
        }
     );
   }
}



public class MyService {
   ...
   public Observable<Document> getFirstOf(String collectionName){
       return ??????? 
   }
}

最佳答案

当您在

中使用Observable.just()
public Observable<Document> getFirstOf(String collectionName){
    return Observable.just(myDao.getFirstDocument(collectionName)); 
}

它等于下一个代码

public Observable<Document> getFirstOf(String collectionName){
    Document doc = myDao.getFirstDocument(collectionName);
    return Observable.just(doc); 
}

您可以注意到它不是异步代码,并且对数据库的请求是在调用线程上执行的。要使该代码异步,您需要像这样重写它

public Observable<Document> getFirstOf(String collectionName){
    return Observable.fromCallable(() -> myDao.getFirstDocument(collectionName)); 
}

如果您使用async MongoDB驱动程序并希望将其包装在Observable中,您可以这样编写

public Observable<Document> getFirstDocument(String collectionName) {
    return Observable.create(emitter -> {
        MongoCollection<Document> collection = database.getCollection(collectionName);
        collection.find().first((document, throwable) -> {
            if(document != null) {
                emitter.onNext(document);
                emitter.onComplete();
            } else if(throwable != null) {
                emitter.onError(throwable);
            }
        });
    });
}

关于Java 异步 MongoDB 驱动程序和 RxJava2 Observables,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53121418/

相关文章:

angular - 从 Angular 2 中的模型获取可观察值

angular - 如何从Angular2中的Observable中选择特定记录

java - 云形成 : What is a RegEx to match S3 bucket names that do not have periods (dots)

java - Spring忽略子模块集成测试中的bean

mongodb - 无法理解 MongoDB 和 RocksDB 之间的关系

java mongodb "geoNear"命令返回异常错误17304

ajax - 在不执行多个请求的情况下多次订阅 rxjs Observer

java - 在 Apache Camel 中上传到 ftp 后删除文件

java - 在java中执行url

node.js - 如何查找集合中 ID 位于数组中的项目?