android - RxJava2 : execute an async function for every item in a list and wait for callback

标签 android asynchronous rx-java rx-java2

我正在为 RxJava2 苦苦挣扎。我想对列表的每个项目执行一个函数。这个函数:

public void function(final Result result) {

    FirebaseFirestore.getInstance().collection(COLLECTION_NAME).document(result.getId()).get().addOnSuccessListener(new OnSuccessListener<DocumentSnapshot>() {
        @Override
        public void onSuccess(DocumentSnapshot documentSnapshot) {
            // do some operation
        }
    });
}

此函数是异步的并使用 FirebaseFirestore。

所以我尝试在我的列表中使用 RxJava2 来为每个项目调用该函数:

Observable.fromIterable(resultList)
                    .concatMap(result -> Observable.fromCallable(new Callable<String>() {
                        @Override
                        public String call() throws Exception {
                            function(result);
                            return "ok";
                        }
                    }))
                    .subscribe(r -> {
                        // do some operation when all firebase async tasks are done
                    });

concatMap 工作并且为列表的每个项目调用该函数。问题是当所有 firebase 异步任务完成后我需要一个回调。

如有任何帮助,我们将不胜感激。

最佳答案

我将尝试绘制一个可能的解决方案:

public class Callback implements OnSuccessListener<DocumentSnapshot> {
     private final ObservableEmitter<DocumentSnapshot> emitter;
     private final boolean last;

     public Callback(boolean lastvalue, ObservableEmitter<DocumentSnapshot> e) {
        this.last = lastvalue;
        this.emitter = e;
     }

     @Override
     public void onSuccess(DocumentSnapshot value) {
         emitter.onNext(value);
         if (last) {
             emitter.onComplete();
         }
     }
}


Observable<DocumentSnapshot> observable = Observable.create(new ObservableOnSubscribe<DocumentSnapshot>() {
        @Override
        public void subscribe(ObservableEmitter<DocumentSnapshot> e) throws Exception {
            int i = 1;
            for (Result result : resultList) {
                /* callback object now knows which is the last request so it can emit the onComplete */
                Callback callbackInstance = new Callback(resultList.size() == i, e);
                i++;
                FirebaseFirestore.getInstance().collection(COLLECTION_NAME)
                            .document(result.getId()).get().addOnSuccessListener(callbackInstance);
                }
            }
        });

然后,当订阅者的 onComplete 操作被触发时,所有对 Firebase 的请求都应该完成。

关于android - RxJava2 : execute an async function for every item in a list and wait for callback,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51572018/

相关文章:

安卓 : how to adjust custom list views?

android - 如何在 android 中处理 NullPointerException?

android - 使用 gradle 从存储库下载 apk

javascript - 优化异步调用(.then() 链接)

java - RxJava - 延迟和延迟订阅之间有什么区别?

android - 带框架布局的抽屉导航

javascript - 网络抓取和 promise

java - 我们是否应该始终对移动后端使用异步 JAX-RS 资源?

android - 如何在 RxJava2 中使用带有 lambda 表达式的 DisposableObserver

c# - 等价于 RxJava