任何人都可以看一下下面的代码并让我知道为什么它的行为不同:
package com.example.rxjava;
import rx.Observable;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
public class MyRxJava {
public static void main(String[] args) {
System.out.println("getting sum ... by thread: " +
Thread.currentThread().getId());
// observeOnNewThread();
subOnNewThread();
independentProcess();
}
private static void observeOnNewThread() {
Observable.create(new CountObservable(100)).observeOn(Schedulers.newThread()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer a) {
System.out.println("processing dependent process " + a + " ..... by thread: "+ Thread.currentThread().getId());
}
});
}
private static void subOnNewThread() {
Observable.create(new CountObservable(100)).subscribeOn(Schedulers.newThread()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer a) {
System.out.println("processing dependent process " + a + " ..... by thread: " + Thread.currentThread().getId());
}
});
}
private static void independentProcess() {
System.out.println("precessing something else .... by thread: "+ Thread.currentThread().getId());
}
}
<小时/>
package com.example.rxjava;
import rx.Observable;
import rx.Subscriber;
public class CountObservable implements Observable.OnSubscribe<Integer> {
private int value;
public CountObservable(int value) {
this.value = value;
}
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("calculating sum .... by thread: "+ Thread.currentThread().getId());
int tot = 0;
for (int k = 0; k<=value; k++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
tot+=k;
}
subscriber.onNext(tot);
subscriber.onCompleted();
}
}
有时会打印
getting sum ... by thread: 1
precessing something else .... by thread: 1
有时
getting sum ... by thread: 1
precessing something else .... by thread: 1
calculating sum .... by thread: 11
无论如何,它不会在
中打印消息@Override
public void call(Integer a) { ... }
方法
当我运行 observeOnNewThread()
方法时,它可以工作,但我想在不同的线程中计算总和
最佳答案
RxJava 调度程序在守护线程上运行,并且您的 main 方法在它们有机会运行完成之前完成。您必须以某种方式等待它们:通过在 main 中 hibernate 、使用 toBlocking().forEach() 或其他 toBlocking() 方法。
关于java - subscribeOn 新线程有时不返回,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32495878/