java - RX : Run Zipped Observables in parallel?

标签 java system.reactive rx-java

所以我在玩 RX(真的很酷),我一直在转换我的 api,它访问 Android 中的 sqlite 数据库以返回 observables。

所以自然而然地,我开始尝试解决的问题之一是,“如果我想进行 3 次 API 调用,获取结果,然后在它们全部完成后进行一些处理怎么办?”

我花了一两个小时,但我最终找到了 Zip Functionality它可以帮助我轻松解决问题:

    Observable<Integer> one = getNumberedObservable(1);
    Observable<Integer> two = getNumberedObservable(2);
    Observable<Integer> three = getNumberedObservable(3);

    Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer arg0, Integer arg1, Integer arg2) {
            System.out.println("Zip0: " + arg0);
            System.out.println("Zip1: " + arg1);
            System.out.println("Zip2: " + arg2);
            return arg0 + arg1 + arg2;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer arg0) {
            System.out.println("Zipped Result: " + arg0);
        }
    });

public static Observable<Integer> getNumberedObservable(final int value) {
    return Observable.create(new OnSubscribeFunc<Integer>() {
        @Override
        public Subscription onSubscribe(Observer<? super Integer> observer) {
            observer.onNext(value);
            observer.onCompleted();
            return Subscriptions.empty();
        }
    });
}

太棒了!所以这很酷。

所以当我压缩 3 个 observable 时,它​​们会串行运行。如果我希望它们全部同时并行运行以便最终更快地获得结果怎么办?我尝试了一些东西,甚至尝试阅读人们编写的一些原始 RX 内容在 C# 中。我相信有一个简单的答案。谁能指出我正确的方向?执行此操作的正确方法是什么?

最佳答案

zip 确实 并行运行 observables - 但它也串行订阅它们。由于您的 getNumberedObservable 是在订阅方法中完成的,因此它给人以串行运行的印象,但实际上并没有这样的限制。

您可以尝试使用一些比订阅逻辑生命周期更长的长时间运行的 Observable,例如 timer,或者使用 subscribeOn 方法异步订阅传递给 压缩

关于java - RX : Run Zipped Observables in parallel?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21201083/

相关文章:

c# - 删除可观察序列中除最后一个样本之外的任何样本,直到消费者准备好

android - 如何在Retrofit中解析Restful api JSON响应?

java - Android Studio Chipmunk 自动创建模块后无法解析测试中的生产代码

java - 为什么相同的字体在从 Netbeans 运行的 Java 应用程序和从 Jar 运行的 Java 应用程序中看起来不同?

c# - 如何使用Observables实现轮询?

multithreading - RxScala : How to keep the thread doing Observable.间隔还活着吗?

rx-java - 也许可以完成

Java:将 JScrollPane 添加到 JTextArea

java - 使用免费数据库和带有 Spring & Hibernate Web 应用程序的 Java 进行冗余数据库复制

javascript - 如何从函数创建 Observable?