java - 如何将 Observable.fromIterable 中的项目索引传递给 subscribe 方法中的 onNext ?

标签 java android retrofit2 reactive-programming rx-java2

我正在尝试使用 RxJava2 加载数据并将其放入 SparseArray 中。我通过从数组调用 URL 来获取数据,但我需要解析响应并将其按照数组中 URL 的顺序插入到 SparseArray 中,因此我需要传递 mUrls.getGroups() 中的 String 项的索引。

提前致谢!

@GET
Single<ResponseBody> getChannels(@Url String url);


groups = new SparseArray<>();


Observable.fromIterable(mUrls.getGroups())
    .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        //
        // How can I access the index of the String item in the array?
        //
        .subscribe(new Observer<ResponseBody>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(ResponseBody responseBody) {
                Group group = GroupParser.parseList(responseBody.byteStream(), index);


                groups.put(index, group);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.getMessage());
            }

            @Override
            public void onComplete() {

            }
 });

编辑:

这是已实现的解决方案:

 Observable.defer(() -> {
                        AtomicInteger counter = new AtomicInteger();
                        return Observable.fromIterable(mUrls.getGroups())
                                .map(url -> new Pair(url, counter.getAndIncrement()));
                    }).flatMapSingle(pair ->
                            aPI.getChannels(pair.first.toString())
                                    .map(responseBody -> new Pair(responseBody, pair.second))
                                    .subscribeOn(Schedulers.io())
                    )
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Pair>() {
                        @Override
                        public void onSubscribe(Disposable d) {

                        }

                        @Override
                        public void onNext(Pair pair) {
                            Pair<ResponseBody, Integer> resultPair =  (Pair<ResponseBody, Integer>) pair;
                            Group group = GroupParser.parseList(resultPair.first.byteStream(),
                                    resultPair.second);

                            groups.put(resultPair.second, group);
                        }

                        @Override
                        public void onError(Throwable e) {
                            Log.e(TAG, "***** message: " + e.getMessage());
                        }

                        @Override
                        public void onComplete() {
                            Log.i(TAG, "***** onComplete.");
                        }
                    });

最佳答案

如果您按顺序处理 URL,则只需在 Observer 中引入 index 字段即可:

Observable.fromIterable(mUrls.getGroups())
    .concatMapSingle(url -> getChannels(url))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<ResponseBody>() {

         int index;  // <----------------------------------------------------

         // ...

         @Override
         public void onNext(ResponseBody responseBody) {
             Group group = GroupParser.parseList(responseBody.byteStream(), index);


             groups.put(index, group);

             index++;  // <---------------------------------------------------------
         }

         // ...
    });

但是,如果您同时处理 URL,则必须将每个 URL 与索引配对并对其进行标记。例如,给定一个 Pair 类:

 Observable.defer(() -> {
     AtomicInteger counter = new AtomicInteger();
     return Observable.fromIterable(mUrls.getGroups())
     .map(url -> Pair.of(url, counter.getAndIncremenet()));
 })
 .flatMapSingle(urlIndex -> 
     getChannels(urlIndex.first)
     .map(v -> Pair.of(v, urlIndex.second))
     .subscribeOn(Schedulers.io())
 )
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(new Observer<Pair<ResponseBody, Integer>>() {

         // ...

         @Override
         public void onNext(Pair<ResponseBody, Integer> pair) {
             Group group = GroupParser.parseList(pair.first.byteStream(), pair.second);


             groups.put(pair.second, group);
         }

         // ...
    });

关于java - 如何将 Observable.fromIterable 中的项目索引传递给 subscribe 方法中的 onNext ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60171708/

相关文章:

java - 当 REST API 在 Retrofit2 RxJava 中返回 401 时刷新访问 token

java - 从 Felix 内部嵌入 Java DB?

android - 强制 Android Activity 始终使用横向模式

java - RxJava - 获取 2 个组合请求的结果数据列表

java - 将 TCP 套接字连接到主机名时捕获超时异常

java - 管理 Android 应用程序主题的最佳方式

Android 与 One Plus 上的 ESP8266 连接(Android 6.0.1)

java - 将 JPA 实体转换为映射

Java:如何取出字符串中的一个字母,同时打印字符串中的其余字母

java - 在 Java 中获取 JRadioButton 的名称