java - RxJava 组合可观察量而不重复执行

标签 java rx-java rx-android rx-java2

短篇故事: 我遇到的情况是,我有 2 个具有单一目的的 Observable:

  • 他们收到一些数据
  • 它们返回修改后的数据
  • 如果数据无法处理则抛出错误

它们各自负责处理不同类型的数据。另外,我想在处理完这两个数据后做一些事情。

我当前的最佳实现如下,这些是我的可观察对象:

    Single<BlueData> blueObservable = Single.create(singleSubscriber -> {
        if (BlueDataProcessor.isDataValid(myBlueData)) {
            singleSubscriber.onSuccess(BlueDataProcessor.process(myBlueData));
        }
        else {
            singleSubscriber.onError(new BlueDataIsInvalidThrow());
        }
    });

    Single<RedData> redObservable = Single.create(singleSubscriber -> {
        if (RedDataProcessor.isDataValid(myRedData)) {
            singleSubscriber.onSuccess(RedDataProcessor.process(myRedData));
        }
        else {
            singleSubscriber.onError(new RedDataIsInvalidThrowable());
        }
    });

    Single<PurpleData> composedSingle = Single.zip(blueObservable, redObservable,
            (blueData, redData) -> PurpleGenerator.combine(blueData, redData));

我还有以下订阅:

    blueObservable.subscribe(
            result -> {
                saveBlueProcessStats(result);
            },
            throwable -> {
                logError(throwable);
            });

    redObservable.subscribe(
            result -> {
                saveRedProcessStats(result);
            },
            throwable -> {
                logError(throwable);
            });


    composedSingle.subscribe(
            combinedResult -> {
                savePurpleProcessStats(combinedResult)
            },
            throwable -> {
                logError(throwable);
            });

我的问题: 蓝色和红色数据被处理两次,因为两个订阅都再次运行,我订阅了使用 Observable.zip() 创建的组合可观察量。

如何在不运行这两个操作两次的情况下实现此行为?

最佳答案

这对于 1.x 中的 Single 来说是不可能的,因为没有 ConnectableSingle 的概念,因此没有 Single.publish。您可以通过2.x和RxJava2Extensions库来实现效果:

SingleSubject<RedType> red = SingleSubject.create();
SingleSubject<BlueType> blue = SingleSubject.create();

// subscribe interested parties
red.subscribe(...);
blue.subscribe(...);

Single.zip(red, blue, (r, b) -> ...).subscribe(...);

// connect()
blueObservable.subscribe(blue);
redObservable.subscribe(red);

关于java - RxJava 组合可观察量而不重复执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40638488/

相关文章:

android - RxJava2 & Retrofit2 服务类 HTTP 响应码

android - IO 上的 RxJava 竞争条件

java - 在屏幕上移动对象,无延迟/滞后

java - 天数 : tmap Nullpointer exception while merging two CSV files

android - 如何控制自动出现不确定进度圈

java - 回顾一下RxJava中的链条

java - RxJava 从 Observable 更新数据

java - Play Framework : Returning 400 instead of 415

java - 安装 SSL 证书后站点显示 'Not Secure'

java - RxJava : Publish subject doOnSubscribe never gets called