在下面的代码中,当调用 dispose()
时,发射器线程会被中断(InterruptedException
从 sleep 方法中抛出)。
Observable<Integer> obs = Observable.create(emitter -> {
for (int i = 0; i < 10; i++) {
if (emitter.isDisposed()) {
System.out.println("> exiting.");
emitter.onComplete();
return;
}
emitter.onNext(i);
System.out.println("> calculation = " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
emitter.onComplete();
});
Disposable disposable = obs
.subscribeOn(Schedulers.computation())
.subscribe(System.out::println);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
disposable.dispose();
从调试 session 中我发现中断源自 FutureTask
,该中断在处理期间被取消。在那里,调用 dispose()
的线程将与运行器线程进行检查,如果不匹配,则发射器将被中断。由于我使用了计算 Scheduler
,因此线程有所不同。
是否有任何方法可以使 dispose 不中断此类发射器,或者实际上应该如何处理?我发现这种方法的一个问题是,当我有一个可中断操作(此处通过 sleep 模拟)时,我希望在调用 onComplete()
之前正常完成该操作。
最佳答案
请引用What's different in 2.0 - Error handling .
One important design requirement for 2.x is that no Throwable errors should be swallowed. This means errors that can't be emitted because the downstream's lifecycle already reached its terminal state or the downstream cancelled a sequence which was about to emit an error.
因此,您可以将所有内容包装在 try/catch 中并正确发出错误:
Observable<Integer> obs = Observable.create(emitter -> {
try {
// ...
} catch (InterruptedException ex) {
// check if the interrupt is due to cancellation
// if so, no need to signal the InterruptedException
if (!disposable.isDisposed()) {
observer.onError(ex);
}
}
});
或者设置一个全局错误消费者来忽略它:
RxJavaPlugins.setErrorHandler(e -> {
// ..
if (e instanceof InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return;
}
// ...
Log.warning("Undeliverable exception received, not sure what to do", e);
});
关于java - 如何在 RxJava 中处理 dispose 而不会出现 InterruptedException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56382253/