场景:我有一个 customerID 字符串,用于查询多个不同的后端系统:日历、帮助台、ERP、CRM 等。我想编译一个报告。 所以我大致有(psydocode):
Result myResult = new Result();
Observable<Cal> cal = Calbackend.get(customerid);
cal.subscribe(calentry -> myResult.addCal(calentry));
Observable<Erp> erp = ERPbackend.get(customerid);
erp.subscribe(erpentry -> myResult.addErp(erpentry));
Observable<Help> help = Helpbackend.get(customerid);
help.subscribe(helpentry -> myResult.addHelp(helpentry));
Observable<Crm> crm = CRMbackend.get(customerid);
crm.subscribe(crmentry -> myResult.addCrm(crmentry));
// Magic here?
return result;
我正在考虑的方法:使用 defer()
来阻止启动,然后另外订阅每个 count()
。然后我可以压缩 count 元素,因为它们每个只会发出一个项目(而其他元素将有不同数量的事件)。但是,如果 myResult.add
的执行速度比 count()
慢,则可能会导致数据丢失。
我想到的另一个选项是为每个订阅设置一组 boolean 标志,并检查每个完成(和错误)事件(如果所有事件都已完成)并执行回调或对该事件使用阻塞。
或者有更好/推荐的方法吗?
最佳答案
运算符toList
可以与zip
一起使用,如下所示:
Observable<List<Cal>> cal = Calbackend.get(customerid).toList();
Observable<List<Erp>> erp = ERPbackend.get(customerid).toList();
Observable<List<Help>> help = Helpbackend.get(customerid).toList();
Observable<List<Crm>> crm = CRMbackend.get(customerid).toList();
Observable.zip(cal, erp, help, crm,
new Func4<List<Cal>, List<Erp>, List<Help>, List<Crm>, Result>() {
@Override
public Result call(List<Cal> cals, List<Erp> erps, List<Help> helps, List<Crm> crms) {
Result myResult = new Result();
// add all cals, erps, helps and crms to result
return myResult;
}
})
.subscribe(new Subscriber<Result>() {
@Override
public void onNext(Result result) {
// do something with the result
}
...
});
说明:顾名思义,toList
运算符创建由源可观察对象发出的项目列表(当源可观察对象发出时,该列表仅发出一次)完成),然后使用 zip
来组合可观察结果的结果。
编辑:如果这些 Observables 可能会发出错误,您可以使用 onErrorReturn
来保持正常流程:
Observable<List<Cal>> cal = Calbackend.get(customerid)
.onErrorReturn(new Func1<Throwable, Cal>() {
@Override
public Cal call(Throwable throwable) {
// Return something in the error case
return null;
}
})
.toList();
关于java - 等待多个可观察对象完成并返回不同数量的元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40188888/