java - RxJava 流 : conditional operators and error handling

标签 java reactive-programming rx-java rx-android

我是 RxJava 的新手,并尝试围绕具有三个要处理的异步方法的更复杂的登录逻辑。对我来说,这是“如果我把这个东西转换成 RxJava,任何事情 (tm) 都是可能的”:)

所以我想做的是:

Call A -> (Process A) -> Call B with results of A -> (Process B) -\
                    \                                              -> Combine and Subscribe
                     \-> Call C with results of A -> (Process C) -/

现在的问题是 Call C 分支应该只在特定条件下执行,而不是其他情况(Combine and Subscribe 然后可以收到一个 NULL 来自该分支的值,这没问题。

此外,错误处理非常重要:虽然 Call ACall C(如果它运行的话)需要通过 onError 传递它们的错误 对于最终订阅者,Call B 的“成功”是可选的,在失败的情况下可以忽略。

这是我到目前为止的想法,它仍然忽略了“C”分支:

 mApi.callA(someArgs)
            // a transition operator to apply thread schedulers
            .compose(applySchedulers())
            // from rxlifecycle-components, unsubscribes automatically 
            // when the activity goes down  
            .compose(bindToLifecycle())
            // possibly other transformations that should work on the (error)
            // states of the first and the following, chained API calls
            .flatMap(response -> processA(response))
            .flatMap(response -> mApi.callB(response.token))
            .flatMap(response -> processB(response))
            // redirects HTTP responses >= 300 to onError()
            .lift(new SequenceOperators.HttpErrorOperator<>())
            // checks for application-specific error payload and redirects that to onError()
            .lift(new SequenceOperators.ApiErrorOperator<>())
            .subscribe(this::allDone this::failure);

我环顾四周 Wiki for conditional operators ,但我找不到如何启动 Call C 分支的提示。

此外,我不确定我的 SequenceOperators 是否以这种方式工作,即是否可以放在链中所有请求之后,或者我是否需要其中的几个,每个都放在 flatMap( ) 运算符触发新的 Call

谁能指出我正确的方向?

谢谢!

最佳答案

你应该使用 Zip运算符 :) 结果应该是这样的:

mApi.callA(someArgs)
        // ...
        .flatMap(response -> processA(response))
        .flatMap(response -> {
              return Observable.zip(
                    callB(response),
                    callC(response),
                    (rA,rB) -> {
                          // or just return a new Pair<>(rA, rB)
                          return combineAPlusB(rA,rB)
                    }
              )
        })
        // ...
        .subscribe(this::allDone this::failure);

关于java - RxJava 流 : conditional operators and error handling,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33108703/

相关文章:

java - volatile 变量读取行为

java - 无法使该Hadoop/级联代码毫无异常(exception)地运行?两者都是新手

java - 继承和关联建模,怎么做?

java - Gradle 传递依赖项不起作用?

c# - 实现一个可观察的设置类

android - 使用 RxSwift 异步加载 UITableView 单元格

android - 如何暂停/恢复 Observable?

javascript - RXJS 可观察拉伸(stretch)

.net - 使用 2 个 int 参数订阅 Delegate

android - 在 Rx 链中多次切换线程