java - 如何在 RxJava 中处理 dispose 而不会出现 InterruptedException

标签 java rx-java rx-java2 interrupted-exception

在下面的代码中,当调用 dispose() 时,发射器线程会被中断(InterruptedException 从 sleep 方法中抛出)。

    Observable<Integer> obs = Observable.create(emitter -> {
        for (int i = 0; i < 10; i++) {
            if (emitter.isDisposed()) {
                System.out.println("> exiting.");
                emitter.onComplete();
                return;
            }

            emitter.onNext(i);
            System.out.println("> calculation = " + i);


            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        emitter.onComplete();
    });

    Disposable disposable = obs
            .subscribeOn(Schedulers.computation())
            .subscribe(System.out::println);

    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    disposable.dispose();

从调试 session 中我发现中断源自 FutureTask,该中断在处理期间被取消。在那里,调用 dispose() 的线程将与运行器线程进行检查,如果不匹配,则发射器将被中断。由于我使用了计算 Scheduler,因此线程有所不同。

是否有任何方法可以使 dispose 不中断此类发射器,或者实际上应该如何处理?我发现这种方法的一个问题是,当我有一个可中断操作(此处通过 sleep 模拟)时,我希望在调用 onComplete() 之前正常完成该操作。

最佳答案

请引用What's different in 2.0 - Error handling .

One important design requirement for 2.x is that no Throwable errors should be swallowed. This means errors that can't be emitted because the downstream's lifecycle already reached its terminal state or the downstream cancelled a sequence which was about to emit an error.

因此,您可以将所有内容包装在 try/catch 中并正确发出错误:

Observable<Integer> obs = Observable.create(emitter -> {
   try {
      // ...
   } catch (InterruptedException ex) {
      // check if the interrupt is due to cancellation
      // if so, no need to signal the InterruptedException
      if (!disposable.isDisposed()) {
         observer.onError(ex);
      }
   }
});

或者设置一个全局错误消费者来忽略它:

RxJavaPlugins.setErrorHandler(e -> {
    // ..
    if (e instanceof InterruptedException) {
        // fine, some blocking code was interrupted by a dispose call
        return;
    }
    // ...
    Log.warning("Undeliverable exception received, not sure what to do", e);
});

关于java - 如何在 RxJava 中处理 dispose 而不会出现 InterruptedException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56382253/

相关文章:

java - 在 Mac OS X 上运行 Ant 构建时出现 "Permission denied"错误

Android Dagger2 + OkHttp + Retrofit 依赖循环错误

java - 将 JSON 与嵌套数组和 json 进行比较(数组顺序无关紧要)

java - 如何将带有图像的 JButton 放在另一个带有图像的 JButton 之上?

java - Android Studio 中的电子邮件有多行

android - Retrofit2 请求Body前面不同括号的奇怪组合

java - 如何使用 Rxjava2 在 Room 数据库中的 Textview 上显示 Flowable 数据

android - 使用 concat 从 DB 请求数据,如果它从 Service 中空获取不工作?

java - NetBeans Gradle项目

java - 用单线程写 LMAX