我正在实现Observable
,它应该只从网络下载一个文件,分为多个部分。
这是我的实现
private static class MultipartDownloadObservable extends Observable<String> implements Cancellable {
@Override
protected void subscribeActual(Observer<? super String> observer) {
// Start loading
subscriber = observer;
progressBundle = new ProgressBundle<>();
startDownloading();
}
@Override
public void cancel() throws Exception {
if (!lastDownloadingInfo.isFinish()) {
okHttpClient.dispatcher().queuedCalls().stream().filter(call -> call.request().tag().equals(url)).forEach(Call::cancel);
}
if (subscriber != null) {
subscriber.onComplete();
}
}
}
在我的代码中,我创建此类的一个实例,然后对从 subscribe 方法返回的 Disposable
对象调用 dispose
方法。
downloadSubscription.dispose();
但是它会抛出错误。
java.io.InterruptedIOException: thread interrupted
如何正确实现Observable
的处置?我尝试实现 Canceble
接口(interface),但是未调用 cancel
方法。
最佳答案
请注意,java.io.InterruptedIOException:线程中断
不是来自rx。这是 java.io 的一个异常(exception)。对于您的 dispose()
操作,rxJava 被迫取消订阅,并在网络请求执行期间中断线程。
您无法真正告诉可观察对象取消网络请求,因为 rxJava 不知道可观察对象内部运行的代码。你可以做的是使用doOnUnsubscribe
检查网络请求是否仍在运行,如果是取消,那么当你想取消时取消订阅即可。
关于java - 如何在 Observable RxJava 中处理处置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47552523/