java - 如何在 Observable RxJava 中处理处置

标签 java android rx-java rx-android rx-java2

我正在实现Observable,它应该只从网络下载一个文件,分为多个部分。

这是我的实现

 private static class MultipartDownloadObservable extends Observable<String> implements Cancellable {
     @Override
        protected void subscribeActual(Observer<? super String> observer) {
            // Start loading
            subscriber = observer;
            progressBundle = new ProgressBundle<>();
            startDownloading();
        }

        @Override
        public void cancel() throws Exception {
            if (!lastDownloadingInfo.isFinish()) {
                okHttpClient.dispatcher().queuedCalls().stream().filter(call -> call.request().tag().equals(url)).forEach(Call::cancel);
            }
            if (subscriber != null) {
                subscriber.onComplete();
            }
        }
}

在我的代码中,我创建此类的一个实例,然后对从 subscribe 方法返回的 Disposable 对象调用 dispose 方法。

 downloadSubscription.dispose();

但是它会抛出错误。

java.io.InterruptedIOException: thread interrupted

如何正确实现Observable的处置?我尝试实现 Canceble 接口(interface),但是未调用 cancel 方法。

最佳答案

请注意,java.io.InterruptedIOException:线程中断不是来自rx。这是 java.io 的一个异常(exception)。对于您的 dispose() 操作,rxJava 被迫取消订阅,并在网络请求执行期间中断线程。

您无法真正告诉可观察对象取消网络请求,因为 rxJava 不知道可观察对象内部运行的代码。你可以做的是使用doOnUnsubscribe检查网络请求是否仍在运行,如果是取消,那么当你想取消时取消订阅即可。

关于java - 如何在 Observable RxJava 中处理处置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47552523/

相关文章:

android - 横幅和插页式广告没有显示?

java - 如何创建JavaFX 16位灰度图像

java - 使用 iText 在 Java 中将多个表转为 PDF 报告

java - Clojure 中是否有适用于 Java 函数的应用函数?

android - Expo EAS 构建在使用 `process.env.X` 访问时不会暴露 secret

java - 是否有更干净的方法使用 RxJava 来适应标准观察者/监听器服务?

java - 访问/gcm-demo/sendAll。原因: HTTP Status Code: 401

Android SMSManager 一般故障

java - RxThreadFactory没有实现并发接口(interface)

java - 订阅后合并PublishSubject