我正在研究 RxJava2 的响应式(Reactive)编程,我对它与 MongoDB 等异步数据库驱动程序的使用有疑问。
如果我使用阻塞 MongoDB 驱动程序来获取集合,方法将是这样的:
public class MyDao{
...
public Document getFirstDocument(String collectionName){
MongoCollection<Document> collection = database.getCollection(collectionName);
return collection.find().first();
}
}
public class MyService {
...
public Observable<Document> getFirstOf(String collectionName){
return Observable.just(myDao.getFirstDocument(collectionName));
}
}
相反,使用 MongoDB 的异步驱动程序时,我的读取操作的返回类型是一个 void(而不是 Document 或 Future),其中包含回调方法,例如:
collection.find().first(
(document, throwable) -> {
myService.myCallback(document);
}
);
那么,我如何将可观察文档传递给 MyService?
public class MyDao{
...
public void getFirstDocument(String collectionName){
MongoCollection<Document> collection = database.getCollection(collectionName);
collection.find().first(
(document, throwable) -> {
//SOME SORT OF CALLBACK
}
);
}
}
public class MyService {
...
public Observable<Document> getFirstOf(String collectionName){
return ???????
}
}
最佳答案
当您在
中使用Observable.just()
时
public Observable<Document> getFirstOf(String collectionName){
return Observable.just(myDao.getFirstDocument(collectionName));
}
它等于下一个代码
public Observable<Document> getFirstOf(String collectionName){
Document doc = myDao.getFirstDocument(collectionName);
return Observable.just(doc);
}
您可以注意到它不是异步
代码,并且对数据库的请求是在调用线程上执行的。要使该代码异步
,您需要像这样重写它
public Observable<Document> getFirstOf(String collectionName){
return Observable.fromCallable(() -> myDao.getFirstDocument(collectionName));
}
如果您使用async
MongoDB驱动程序并希望将其包装在Observable
中,您可以这样编写
public Observable<Document> getFirstDocument(String collectionName) {
return Observable.create(emitter -> {
MongoCollection<Document> collection = database.getCollection(collectionName);
collection.find().first((document, throwable) -> {
if(document != null) {
emitter.onNext(document);
emitter.onComplete();
} else if(throwable != null) {
emitter.onError(throwable);
}
});
});
}
关于Java 异步 MongoDB 驱动程序和 RxJava2 Observables,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53121418/