我正在使用reactiveX Zip做一些实验,我注意到我在zip中定义的可观察量是一个接一个地按顺序执行的。我认为 zip 的好处在于 zip 中定义的每个可观察量都由一个线程执行,因此所有这些都是并行执行的。有什么办法可以实现我想要的吗? 这是我的 zip 示例
@Test
public void testZip() {
Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2)
.concat(s3))
.subscribe(System.out::println);
}
public Observable<String> obString() {
System.out.println(Thread.currentThread().getId());
return Observable.just("hello");
}
public Observable<String> obString1() {
System.out.println(Thread.currentThread().getId());
return Observable.just(" world");
}
public Observable<String> obString2() {
System.out.println(Thread.currentThread().getId());
return Observable.just("!");
}
最佳答案
你看错了东西。
obString*
都在同一个线程上执行,因为它们是在 testZip
中调用它们时执行的。
您想要查看的是可观察对象中发生的情况,这不可能仅使用仅仅
,您需要一个自定义可观察对象并查看onSubscribe
正文中的当前线程。
此外,您可能希望使用 scheduleOn
为您的 Observable
提供一个专门的新线程或线程池。
关于java - 并行执行的可观察量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34385912/