java - 多次订阅 RxJava observable

标签 java reactive-programming rx-java observable

我有一个关于 RxJava Observable 的问题。 例如,我有一个 Retrofit 接口(interface),它向我返回 Observable。我需要对这个视频流做点什么。这是下载视频并将其保存到列表中的代码:

API.getVideoListObservable()
                .doOnError(t -> t.printStackTrace())
                .map(r -> r.getObjects())
                .doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
                .doOnNext(l -> kalturaVideoList.addAll(l))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();

API - retrofit 支架

如果想更新视频列表,我应该做同样的操作吗?或者我应该订阅,取消订阅然后再次订阅,这样:

Subscription s = API.getVideoListObservable()
                .doOnError(t -> t.printStackTrace())
                .map(r -> r.getObjects())
                .doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
                .doOnNext(l -> kalturaVideoList.addAll(l))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();

//some code
...

s.unsubscribe();
s = null;
s = API.getVideoListObservable()
                .doOnError(t -> t.printStackTrace())
                .map(r -> r.getObjects())
                .doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
                .doOnNext(l -> kalturaVideoList.addAll(l))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();

最佳答案

您的整个事件流程是错误的,您应该在订阅者中更新 UI,而不是在 doOnNext 中。

API.getVideoListObservable()
    .map(r -> r.getObjects())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(videos -> {
        // do stuff with videos
        fragment.updateVideoList(videos);
        kalturaVideoList.addAll(videos);
    }, throwable -> {
        // handle errors
        throwable.printStackTrace();
    });

Retrofit 为您设置 subscribeOn(Schedulers.io())

将所有内容移动到一个函数中,并在需要更新列表时调用它。 无需手动取消订阅,因为订阅者在收到 onCompleted 或 onError 时会自动取消订阅。

请注意,在 Android 上使用 RxJava 时,您必须处理 Activity/Fragment 生命周期和配置更改。您必须跟踪未决的网络调用,并在用户导航到另一个 Activity/Fragment 时手动取消订阅。

到目前为止,我解决所有 RxJava + Android 问题的首选方法是使用 https://github.com/evant/rxloader图书馆。使用 RxLoader 的代码将如下所示:

private Observable<List<Video>> getVideos() {
    return API.getVideoListObservable()
        .map(r -> r.getObjects());
}

// in onCreate()
RxLoaderManager loaderManager = RxLoaderManager.get(this);
RxLoader<List<Video>> videoLoader = loaderManager.create(
    getVideos().observeOn(AndroidSchedulers.mainThread()),
    new RxLoaderObserver<List<Video>>() {
        @Override
        public void onNext(List<Video> videos) {
            // do stuff with videos
            fragment.updateVideoList(videos);
            kalturaVideoList.addAll(videos);
        }

        @Override
        public void onError(Throwable throwable) {
            // handle errors
            throwable.printStackTrace();
        }
    });

// whenever you need to update videos
videoLoader.restart();

关于java - 多次订阅 RxJava observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30373385/

相关文章:

java - 查看 mojarra 上的过期问题

java - 如何在多线程应用程序中高效使用 RestTemplate?

ios ->- 运算符在 RxSwift 中是什么意思,它在哪里记录?

javascript - 在 React Native 中使用 this.props.children 加载大组件是否存在性能问题?

Android O 后台服务生命周期影响 RxJava Observable?

java - 混合 Java 流 + Rxjava 安全吗

java - 在java中索引Treemap值

java - 根据 JComboBox 选择添加 JList 项目

c# - 为什么我用 Repeat() 得到不确定的结果

java - RxJava 错误处理