java - 在另一个 Observable 中订阅一个 Observable

标签 java rx-java reactivex

我正在调试我的代码,它获取 UserWallet从数据库中获取地址,然后通过连接外部 REST API 为其生成地址。现在我有一个订阅嵌套在另一个订阅中,但我读到这是一个糟糕的解决方案(它实际上不起作用,我认为这就是原因)。

userWalletDao.getUnregisteredUserWallets()
                .subscribe(nextWallet -> {
                    log.info("Fetched next wallet for registration {}", nextWallet);
                    blockchainIntegration.registerUserWallet(nextWallet.getUserId())
                            .subscribe(address -> {
                                nextWallet.setAddress(address);
                                userWalletDao.persistUserWalletAddress(nextWallet);
                                log.info("Registered wallet {} with address {}.", nextWallet, address);
                            });
                });

我试图在一次订阅中完成它,但如果我将钱包平面映射到地址,我会丢失 UserWallet对象来为其设置获取的地址并将其保留回数据库中。

如何获取钱包,然后调用 API 为其生成一个订阅地址?

getUnregisteredUserWallets()返回Observable<UserWallet>registerUserWallet()返回Single<String> .

最佳答案

强烈建议阅读并理解第一条评论中提到的依赖子流。

您可以通过将可观察序列更改为这样的内容来解决您的问题

       userWalletDao.getUnregisteredUserWallets()
                .flatMap(nextWallet -> registerUserWallet(nextWallet.getUserId()).toObservable()
                        .flatMap(address -> Observable.fromCallable(() -> new Pair<>(nextWallet, address))))  // return both wallet from previous mapping and address from current mapping to the next level
                .flatMapCompletable(walletAddressPair -> Completable.fromAction(()->{
                    Wallet nextWallet = walletAddressPair.first;
                    String address = walletAddressPair.second;
                    nextWallet.setAddress(address);
                    userWalletDao.persistUserWalletAddress(nextWallet);
                    log.info("Registered wallet {} with address {}.", nextWallet, address);
                    // here wallet and address have been saved to db. This operation is a completable action, you don't have to return any result 
                    // from it and forward to the next level.  Thats why flatMapCompletable is used.
                }))
                .subscribeWith(new DisposableCompletableObserver() {
                    @Override
                    public void onComplete() {
                       // All actions completed
                    }

                    @Override
                    public void onError(Throwable e) {
                      // any error occurred in the observable chain
                    }
                });

关于java - 在另一个 Observable 中订阅一个 Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51150411/

相关文章:

ios - RxSwift - 双向绑定(bind)以使用同一屏幕编辑和添加模型

Java 配置 Spring 安全 FORM_LOGIN_FILTER

java - 有没有办法在给定文件中获取Java中的根目录

java - 如何使使用全局动态属性的代码可进行单元测试?

java - 如何让下面的代码在alfresco中正常工作?

android - 函数不在后台线程上执行

java - 如何使用 RxJava 在 onNext() 中抛出错误

rx-java - RxJs 中 Observable.expand() 的 RxJava 等价物是什么?

java - io.reactivex.exceptions.UndeliverableException 异常无法传递给消费者,因为它已经取消/处置

ios - RxSwift : Receive events immediately, 除非最后一个事件在某个时间间隔内被处理