java - RxJava : dynamically create Observables and send the final resut as Observable

标签 java rx-java reactive-programming

我正在使用 RxJava,我想在其中根据某些条件动态创建多个 Observable。完成创建后,我想对 observable 返回的不同值进行一些处理,然后作为我可以订阅的单个 Observable 发送。这是我的代码:

List<String> valueList = .... 

List<Observable<String>> listOfObservables = new ArrayList<Observable<String>>();

for(int i =; i <valueList.size(); i++){
        listOfObservables.add(new SomeClass.doOperation(valueList(i)));
        // SomeClass.doOperation will return an Observable<String>
    }

return Observable.merge(listOfObservables);

但是在这里,我想对 listOfObservable 中不同 Observables 发出的值做一些操作。最后将其作为单个 Observable<String> 返回

喜欢 Observable.zip() , 我可以这样做

return Observable.zip(observable1, observable2, (string1, string2) -> {
            // joining final string here
            return string1 + string2;

但我知道这里有多少参数。请告诉我如何实现这一目标。

最佳答案

使用zip 重载that takes a variable number of arguments , 它有一个签名

<R> Observable<R> zip(Iterable<? extends Observable<?>> ws,
                      FuncN<? extends R> zipFunction)

示例用法:

List<String> valueList = .... 

return Observable.from(valueList)
    .map(string -> SomeClass.doOperationThatReturnsObservable(string))
    .toList()
    .flatMap(listOfObs -> Observable.zip(listOfObs, (Object[] results) -> {
       // do something with the strings in the array.
       return Arrays.stream(results)
                    .map(Object::toString)
                    .collect(Collectors.joining(","));
    }));

关于java - RxJava : dynamically create Observables and send the final resut as Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39200322/

相关文章:

java - Clojure 能否从现有流程中捕获标准?

java - Eclipse UTF-8-奇怪的字符

java - 安卓 RxJava 2 : What is the difference between fromCallable and Just?

r - 如何在 Shiny 的 react 表达式中使用以前的 react 值?

macos - Quartz Composter —重新创建Audioskop(根据音频输入调整视频时间)

java - 适用于 Mac OSX Catalina 的 Android studio 3.5 在索引进程中崩溃

java - 等待对象创建或如果它已经存在则立即访问它

java.lang.IllegalStateException : Expected to be called on the main thread but was RxCachedThreadScheduler-1 错误

java - 不鼓励使用 Mono<Optional<T>> 吗?

java - 将预先编写的 SpringMVC + Hibernate 部署到 AWS