java - RxJava 按需加载项目

标签 java android rx-java rx-java2

当其他一些可观察对象发出新项目(触发器)时,我想加载下一个项目。

这是发出项目的代码:

public Observable<Item> get() {
    return idApi.get().flatMap(new Function<List<String>, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(@NonNull List<String> ids) throws Exception {
            return Observable.fromIterable(ids);
        }
    }).flatMap(new Function<String, ObservableSource<Item>>() {
        @Override
        public ObservableSource<Item> apply(@NonNull final String id) throws Exception {
            return dataApi.get(id).map(new Function<Data, Item>() {
                @Override
                public Item apply(@NonNull Data data) throws Exception {
                    return new Item(data , id);
            });
        }
    });
}

触发可观察:

RxView.clicks(view.findViewById(R.id.button_more)).debounce(500, TimeUnit.MILLISECONDS);

解决这个问题的唯一方法是使用 Subject 并保存对 ids 列表的引用,这不优雅且看起来不具有反应性。

编辑:这是我到目前为止的解决方案,但我必须直接订阅触发事件。我不认为它优雅。

@Override
public Observable<Item> get(final Observable<Object> trigger) {
    final PublishSubject<Item> subject = PublishSubject.create();
    return idApi.get().flatMap(new Function<List<String>, ObservableSource<Queue<String>>>() {
        @Override
        public ObservableSource<Queue<String>> apply(@NonNull List<String> ids) throws Exception {
            final Queue<String> q = new LinkedList<>(ids);
            return Observable.just(q);
        }
    }).flatMap(new Function<Queue<String>, ObservableSource<Item>>() {
        @Override
        public ObservableSource<Item> apply(@NonNull final Queue<String> ids) throws Exception {
            trigger.subscribeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Object>() {
                @Override
                public void accept(@NonNull Object o) throws Exception {
                    if (ids.size() > 0) {
                        final String id = ids.poll();
                        dataApi.get(id).map(new Function<Data, Item>() {
                            @Override
                            public Item apply(@NonNull Data data) throws Exception {
                                return new Item(data, id) l
                            }
                        }).subscribe(new Consumer<Item>() {
                            @Override
                            public void accept(@NonNull Item item) throws Exception {
                                subject.onNext(item);
                            }
                        });
                    } else {
                        subject.onComplete();

                    }
                }
            });
            return subject;
        }
    });
}

最佳答案

使用压缩包

public Observable<Item> get(View v) {
    return idApi.get().flatMap(new Function<List<String>, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(@NonNull List<String> ids) throws Exception {
            return Observable.fromIterable(ids);
        }
    }).zipWith(RxView.clicks(v).debounce(500, TimeUnit.MILLISECONDS), (n, i) -> n))    
    .flatMap(new Function<String, ObservableSource<Item>>() {
        @Override
        public ObservableSource<Item> apply(@NonNull final String id) throws Exception {
            return dataApi.get(id).map(new Function<Data, Item>() {
                @Override
                public Item apply(@NonNull Data data) throws Exception {
                    return new Item(data , id);
            });
        }
    });
}

每次点击获得 N 个项目

public Observable<Item> getN(View v, int nitems) {
    return idApi.get().flatMap(new Function<List<String>, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull List<String> ids) throws Exception {
                return Observable.fromIterable(ids);
            }
        }).buffer(nitems).zipWith(RxView.clicks(v).debounce(500, TimeUnit.MILLISECONDS), (n, i) -> n))
        .flatMap(new Function<List<String>, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull final List<String> ids) throws Exception {
                return Observable.from(ids)
            }
         }
        )  
        .flatMap(new Function<String, ObservableSource<Item>>() {
            @Override
            public ObservableSource<Item> apply(@NonNull final String id) throws Exception {
                return dataApi.get(id).map(new Function<Data, Item>() {
                    @Override
                    public Item apply(@NonNull Data data) throws Exception {
                        return new Item(data , id);
                 });
               }
         }
        });
 }

编辑:您仍然需要使用 subscribeOn 来确保您位于 RXView.clicks 的主线程上以及任何网络的 IO 线程上。

关于java - RxJava 按需加载项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43364077/

相关文章:

Android RxJava,非阻塞?

android - 什么是 RxJava 等同于 orElse

java - 优化 HibernateTemplate 的数据插入

java - http请求的线程池

java - 无法通过Java eclipse连接到Google云SQL

java - ReactiveX:计算 Observable 中不同元素的频率

java - GWT 中的客户端缓存

android - 无法解决本地的 tapandpay 依赖关系

javascript - navigator.geolocation.getCurrentPosition() 在 Android 上的 WebView 中永远不会返回

安卓磨损 : Programmatically stop my app's notifications from appearing on watch