java - 并行执行的可观察量

标签 java reactive-programming rx-java reactivex

我正在使用reactiveX Zip做一些实验,我注意到我在zip中定义的可观察量是一个接一个地按顺序执行的。我认为 zip 的好处在于 zip 中定义的每个可观察量都由一个线程执行,因此所有这些都是并行执行的。有什么办法可以实现我想要的吗? 这是我的 zip 示例

         @Test
public void testZip() {
    Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2)
                                                                     .concat(s3))
              .subscribe(System.out::println);
}

public Observable<String> obString() {
    System.out.println(Thread.currentThread().getId());
    return Observable.just("hello");
}

public Observable<String> obString1() {
    System.out.println(Thread.currentThread().getId());
    return Observable.just(" world");
}

public Observable<String> obString2() {
    System.out.println(Thread.currentThread().getId());
    return Observable.just("!");
}

最佳答案

你看错了东西。

obString* 都在同一个线程上执行,因为它们是在 testZip 中调用它们时执行的。

您想要查看的是可观察对象中发生的情况,这不可能仅使用仅仅,您需要一个自定义可观察对象并查看onSubscribe 正文中的当前线程。

此外,您可能希望使用 scheduleOn 为您的 Observable 提供一个专门的新线程或线程池。

关于java - 并行执行的可观察量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34385912/

相关文章:

ios - 检测 RACSignal 的最佳实践

rx-java - 如何使用 RxJava 并行调用集合中每个元素的方法

android - RxJava 使用 onComplete 每 x 秒调用一次服务

java - Java 线程 wait() 和 Notify() 似乎工作异常

java - 可扩展类的哈希码(面向 future )

java - 无法连接到java中的Access数据库文件

objective-c - RACSignal : how to reduce on arbitrarily large combine

java - 如何在项目 react 器中组合来自两个 react 流的数据?

android - 由于flatMap未完成,因此FlatMap失败后RxJava使用toList

java - 什么是 map[(int)x]++; 它有什么作用?