java - subscribeOn 新线程有时不返回

标签 java multithreading rx-java

任何人都可以看一下下面的代码并让我知道为什么它的行为不同:

package com.example.rxjava;

import rx.Observable;
import rx.functions.Action1;
import rx.schedulers.Schedulers;

public class MyRxJava {
    public static void main(String[] args) {
        System.out.println("getting sum ... by thread: " +
            Thread.currentThread().getId());
    //        observeOnNewThread();

    subOnNewThread();
    independentProcess();
    }

    private static void observeOnNewThread() {
        Observable.create(new CountObservable(100)).observeOn(Schedulers.newThread()).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer a) {
                System.out.println("processing dependent process " + a + "  ..... by thread: "+ Thread.currentThread().getId());
            }
        });
    }

    private static void subOnNewThread() {
        Observable.create(new CountObservable(100)).subscribeOn(Schedulers.newThread()).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer a) {
                System.out.println("processing dependent process " + a + "  ..... by thread: " + Thread.currentThread().getId());
            }
        });
    }

    private static void independentProcess() {
        System.out.println("precessing something else .... by thread: "+ Thread.currentThread().getId());
    }
}
<小时/>
package com.example.rxjava;

import rx.Observable;
import rx.Subscriber;

public class CountObservable implements Observable.OnSubscribe<Integer> {

private int value;

public CountObservable(int value) {
    this.value = value;
}

@Override
public void call(Subscriber<? super Integer> subscriber) {
    System.out.println("calculating sum ....  by thread: "+ Thread.currentThread().getId());

    int tot = 0;

    for (int k = 0; k<=value; k++) {
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        tot+=k;
    }



        subscriber.onNext(tot);
        subscriber.onCompleted();
  }
}

有时会打印

getting sum ... by thread: 1
precessing something else .... by thread: 1

有时

getting sum ... by thread: 1
precessing something else .... by thread: 1
calculating sum ....  by thread: 11

无论如何,它不会在

中打印消息
@Override
public void call(Integer a) { ... }

方法

当我运行 observeOnNewThread() 方法时,它可以工作,但我想在不同的线程中计算总和

最佳答案

RxJava 调度程序在守护线程上运行,并且您的 main 方法在它们有机会运行完成之前完成。您必须以某种方式等待它们:通过在 main 中 hibernate 、使用 toBlocking().forEach() 或其他 toBlocking() 方法。

关于java - subscribeOn 新线程有时不返回,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32495878/

相关文章:

java - 如何将异常从内部 doOnError 传播到外部 doOnError?

java - 数据库字段验证客户端

java - 线程安全否定 AtomicBoolean get() 作为 while 循环中的条件

java - 在 HTML 格式的 JEditorPane 中选择指定的文本

java - 如何防止java中同步代码中的happens-before?

c++ - 在 C++ 中多线程时出现 "no matching function call"错误

java - 使用 RxJava 和改造的格式错误的 JSON

java - RxJava : How to retain Thread from upstream

Java-eclipse-获取.exe文件

java - 如何使用另一列仅获取一列数据