java - 主线程中的SubscribeOn和observeOn

标签 java rx-java reactive-programming rx-java2

我尝试将 subscribeOn 和 obsereOn 与执行器一起使用,以便在异步任务完成后我可以返回主线程。 我最终得到了这段代码,但它不起作用

@Test
    public void testBackToMainThread() throws InterruptedException {
        processValue(1);
        processValue(2);
        processValue(3);
        processValue(4);
        processValue(5);
//        while (tasks.size() != 0) {
//            tasks.take().run();
//        }
        System.out.println("done");
    }

    private LinkedBlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();


    private void processValue(int value) throws InterruptedException {
        Observable.just(value)
                .subscribeOn(Schedulers.io())
                .doOnNext(number -> processExecution())
                .observeOn(Schedulers.from(command -> tasks.add(command)))
                .subscribe(x -> System.out.println("Thread:" + Thread.currentThread().getName() + " value:" + x));
        tasks.take().run();
    }

    private void processExecution() {
        System.out.println("Execution in " + Thread.currentThread().getName());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

知道如何实现我想要的吗?

当我运行时,我只打印

Execution in RxIoScheduler-2
Execution in RxIoScheduler-3
Execution in RxIoScheduler-4
Execution in RxIoScheduler-5
Execution in RxIoScheduler-6
done

问候

最佳答案

您的方法的问题在于,您无法知道在给定时间应该执行多少个任务,并且在等待解锁主线程后应该发生的任务时也不会出现死锁。

据我所知,任何 1.x 扩展都不支持返回 Java 主线程。对于 2.x,有 BlockingScheduler来自允许您执行此操作的扩展项目:

public static void main(String[] args) {
    BlockingScheduler scheduler = new BlockingScheduler();

    scheduler.execute(() -> {
        Flowable.range(1,10)
        .subscribeOn(Schedulers.io())
        .observeOn(scheduler)
        .doAfterTerminate(() -> scheduler.shutdown())
        .subscribe(v -> System.out.println(v + " on " + Thread.currentThread()));
    });

    System.out.println("BlockingScheduler finished");
}

请注意对 scheduler.shutdown() 的调用,最终必须调用它来释放主线程,否则您的程序可能永远不会终止。

关于java - 主线程中的SubscribeOn和observeOn,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45648686/

相关文章:

java - 当关闭相关(相同)entityManager 时,entityManager 先前找到或获取的实体怎么样?

java - Retrofit2 + RxJava + Jackson 默默失败

mvvm - 在哪里绑定(bind) MVVM 中的 observables?

android - 订阅后一次性对象为空

java - 将 observable 转换为列表

java - 如何修复 android.support 和 firebase 错误?

java - ExchangeFilterFunction 在 WebClient react 流之外执行代码?

java - HttpServletResponse.setHeader 方法应该如何处理 null 值?

java - RxJava2 中一次从列表中取出 n 个元素

rx-java - Rx 跳过,直到经过几秒