java - RxJava : How to . 压缩两个 Observable,然后 .merge 它们并最终 .reduce 以聚合所有结果

标签 java rx-java

我有以下代码:

public void foo() {
    Long[] gData = new Long[] { 1L, 2L };

    rx.Observable.from(gData)
    .concatMap(data -> {

        rx.Observable<GmObject> depositObs1 = depositToUserBalance(data, 1);
        rx.Observable<GmObject> depositObs2 = depositToUserBalance(data, 2);


        return rx.Observable.zip(depositObs1, depositObs2, (depositObj1, depositObj2) -> {

            depositObj1.putNumber("seat_index", data);
            depositObj2.putNumber("seat_index", data);

            return rx.Observable.merge(
                    rx.Observable.just(depositObj1),
                    rx.Observable.just(depositObj2));
        })
    })
    .reduce(new ArrayList<Long>(), (payoutArr, payoutObj) -> {

        int seatIndex = ((GmObject) payoutObj).getNumber("seat_index").intValue();
        long payout = ((GmObject) payoutObj).getNumber("payout").longValue();
        payoutArr.add(seatIndex, payout);
        return payoutArr;
    })
    .subscribe(results -> {
        System.out.println(results);
    });
}

此代码使用 .zip 发送到可观察对象,然后它添加一个“seat_index”属性并调用 .merge 以使用 .reduce,因此最终所有结果将聚合到一个 ArrayList 中。

这段代码有一个问题:当 .reduce 处理它的输入时,它将它作为 Observable 而不是 GmObject ...什么函数可以从它的 Observable 包装中“提取” GmObject?

这样使用 rxJava 有意义吗?或者有更好的技术?

谢谢!

最佳答案

zip运算符将 lambda 作为第三个参数。这个 lambda 是一个 2 args 函数,它返回一个对象,该对象是 args 组合的结果。而不是 Observable组合的结果(但是,当然,对象可以是 Observable ,但在您的情况下这不是您想要的)。

所以在你的 zip 之后打电话,你会得到一个Observable<Observable<GmObject>>但你期待一个Observable<GmObject> .

我不认为 zip operator 是您要查找的运算符(operator)。

public void foo() {
    Long[] gData = new Long[] { 1L, 2L };

    rx.Observable.from(gData)
    .concatMap(data -> {

        rx.Observable<GmObject> depositObs1 = depositToUserBalance(data, 1).doOnNext(obj -> obj.putNumber("seat_index", data));
        rx.Observable<GmObject> depositObs2 = depositToUserBalance(data, 2).doOnNext(obj -> obj.putNumber("seat_index", data));


        return rx.Observable.merge(depositObs1, depositObs2);
    })
   .reduce(new ArrayList<Long>(), (payoutArr, payoutObj) -> {

        int seatIndex = ((GmObject) payoutObj).getNumber("seat_index").intValue();
        long payout = ((GmObject) payoutObj).getNumber("payout").longValue();
        payoutArr.add(seatIndex, payout);
        return payoutArr;
   })
   .subscribe(results -> System.out.println(results));
}

关于java - RxJava : How to . 压缩两个 Observable,然后 .merge 它们并最终 .reduce 以聚合所有结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30030197/

相关文章:

java - OptaPlanner 在 CartesianProductMoveSelector 创建的 CompositeMove 上抛出 IllegalStateException

java - gluLookAt 第一人称视角

java - RxJava - 在完成之前触发另一个 Observable/Completable 并且仅在完成之后触发

java - RxJava - 合并 2 个调用

java - 检查 RxJava 中是否有订阅者抛出异常

java - RxJava - 使用 Single.Error/Observable.error 与抛出异常

java - Android 本地化查询

java - (Java8)源码中ArrayList的多个实现有什么区别

Java - 保存 XML 后保留空格

android - 调试未收到消息的 RxJava 问题的最佳方法是什么