java - 等待多个可观察对象完成并返回不同数量的元素

标签 java rx-java reactivex

场景:我有一个 customerID 字符串,用于查询多个不同的后端系统:日历、帮助台、ERP、CRM 等。我想编译一个报告。 所以我大致有(psydocode):

Result myResult = new Result();
Observable<Cal> cal = Calbackend.get(customerid);
cal.subscribe(calentry -> myResult.addCal(calentry));

Observable<Erp> erp = ERPbackend.get(customerid);
erp.subscribe(erpentry -> myResult.addErp(erpentry));

Observable<Help> help = Helpbackend.get(customerid);
help.subscribe(helpentry -> myResult.addHelp(helpentry));

Observable<Crm> crm = CRMbackend.get(customerid);
crm.subscribe(crmentry -> myResult.addCrm(crmentry));

// Magic here?

return result;

我正在考虑的方法:使用 defer() 来阻止启动,然后另外订阅每个 count() 。然后我可以压缩 count 元素,因为它们每个只会发出一个项目(而其他元素将有不同数量的事件)。但是,如果 myResult.add 的执行速度比 count() 慢,则可能会导致数据丢失。

我想到的另一个选项是为每个订阅设置一组 boolean 标志,并检查每个完成(和错误)事件(如果所有事件都已完成)并执行回调或对该事件使用阻塞。

我看过了herehere但这些示例涉及常量或数据类型。

或者有更好/推荐的方法吗?

最佳答案

运算符toList可以与zip一起使用,如下所示:

Observable<List<Cal>> cal = Calbackend.get(customerid).toList();
Observable<List<Erp>> erp = ERPbackend.get(customerid).toList();
Observable<List<Help>> help = Helpbackend.get(customerid).toList();
Observable<List<Crm>> crm = CRMbackend.get(customerid).toList();
Observable.zip(cal, erp, help, crm,
                new Func4<List<Cal>, List<Erp>, List<Help>, List<Crm>, Result>() {
                    @Override
                    public Result call(List<Cal> cals, List<Erp> erps, List<Help> helps, List<Crm> crms) {
                        Result myResult = new Result();
                        // add all cals, erps, helps and crms to result
                        return myResult;
                    }
                })
                .subscribe(new Subscriber<Result>() {
                    @Override
                    public void onNext(Result result) {
                        // do something with the result
                    }

                    ...
                });

说明:顾名思义,toList 运算符创建由源可观察对象发出的项目列表(当源可观察对象发出时,该列表仅发出一次)完成),然后使用 zip 来组合可观察结果的结果。

编辑:如果这些 Observables 可能会发出错误,您可以使用 onErrorReturn 来保持正常流程:

Observable<List<Cal>> cal = Calbackend.get(customerid)
            .onErrorReturn(new Func1<Throwable, Cal>() {
                @Override
                public Cal call(Throwable throwable) {
                    // Return something in the error case
                    return null;
                }
            })
            .toList();

关于java - 等待多个可观察对象完成并返回不同数量的元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40188888/

相关文章:

java - 安卓开发 : Parsing a XML file

java - 多次订阅 RxJava observable

java - 为什么我的 RxJava observable 没有触发订阅者?

java - RxJava 自上次事件以来忽略 X 时间的事件?

swift3 - RxSwift : Repeat a (completed) stream

java - 从其他类调用时 JFrame 未加载

java - EnumMap 和 Java 泛型

java - java中泛型方法的区别

java - 如何在并行线程中执行 observable

javascript - 我可以从链式 AJAX 调用生成一系列 Rx Observables 吗?