java-8 - 如何使用 Observables 迭代列表中的所有项目

标签 java-8 observable rx-java2

我正在学习如何使用rxjava。如下面的代码所示,我有 List<List<Person>>我打算做的是迭代 Person 的所有列表 并显示有多少个 Person 类型的对象在每个列表中。

我编写了以下代码:

.map(p->p.get(0).getName().map(r->r.toUpperCase()).orElse("NULL_VALUE"))

但正如你所见,我总是引用项目编号 0。如何引用列表中的所有项目,就好像我使用 for 循环一样,如下所示:

for (int i = 0; i< length; i++)
    p.get(i)

我希望我的问题很清楚。

提前致谢

代码:

public static void main(String[] args) {
    Observable<List<Person>> observables = Observable.create(e-> {
        for(List<Person> p : Main.getPersons()) {
            e.onNext(p);
        }
        e.onComplete();
    });
     observables
    .map(p->p.get(0).getName().map(r->r.toUpperCase()).orElse("NULL_VALUE"))
    .doOnNext(r->System.out.println("r: " + r))
    .observeOn(Schedulers.io())
    .subscribe(new Observer() {
        @Override
        public void onComplete() {
            // TODO Auto-generated method stub
            System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable arg0) {
            // TODO Auto-generated method stub

        }

        @Override
        public void onNext(Object arg0) {
            // TODO Auto-generated method stub
            System.out.println("onNextFromObserver: " + arg0);
        }

        @Override
        public void onSubscribe(Disposable arg0) {
            // TODO Auto-generated method stub
        }
    });
}

private static <T> Observable<T> toObservable(T s) {
    return Observable.just(s);
}
private static List<List<Person>> getPersons() {
    return Arrays.asList(
            Arrays.asList(new Person("Sanna1", 59, "EGY"), new Person(null, 59, "EGY"), new Person("Sanna3", 59, null)),
            Arrays.asList(new Person("Mohamed1", 59, "EGY"), new Person(null, 59, "EGY")),
            Arrays.asList(new Person("Ahmed1", 44, "QTR"), new Person("Ahmed2", 44, "QTR"), new Person(null, null, "QTR")),
                    Arrays.asList(new Person("Fatma", 29, "KSA")),
                    Arrays.asList(new Person("Lobna", 24, "EGY")));
}
}

public class Person {
private String name = null;
private String address = null;
private int age;

private Optional<String> optName= null;
private Optional<Integer> optAge= null;
private Optional<String> optAddress = null;

public Person(String name, Integer age, String address) {
    this.optName = Optional.ofNullable(name);
    this.optAge = Optional.ofNullable(age);
    this.optAddress = Optional.ofNullable(address);
}

public Optional<String> getName() {
    return optName;
}

public void setName(String name) {
    this.optName = Optional.ofNullable(name);
}

public Optional<String> getAddress() {
    return this.optAddress;
}

public void setAddress(String address) {
    this.optAddress = Optional.ofNullable(address);
}

public Optional<Integer> getAge() {
    return this.optAge;
}

public void setAge(int age) {
    this.optAge = Optional.ofNullable(age);
}
}

更新

public static void main(String[] args) {
    Observable<List<Person>> observables = 
    Observable.fromIterable(Main.getPersons());
    observables
    //.subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .concatMap(list->Observable.fromIterable(list)
            .map(p->p.getName()
                    .map(r->r.toUpperCase()).orElse("NULL_VALUE")))
    .observeOn(Schedulers.io())
    .blockingSubscribe(new Observer<String>() {

        @Override
        public void onComplete() {
            // TODO Auto-generated method stub
            System.out.println("onComplete: ");
        }

        @Override
        public void onError(Throwable arg0) {
            // TODO Auto-generated method stub
            System.out.println("onError: ");
        }

        @Override
        public void onNext(Object arg0) {
            // TODO Auto-generated method stub
            System.out.println("onNext: ");
        }

        @Override
        public void onSubscribe(Disposable arg0) {
            // TODO Auto-generated method stub
            System.out.println("onSubscribe: ");
        }
    });

}

最佳答案

使用fromIterableconcatMap:

public static void main(String[] args) {
    Observable<List<Person>> observables =
        Observable.fromIterable(Main.getPersons());

    observables
    .concatMap(personList -> 
        Observable.fromIterable(personList)
        .map(aPerson -> 
            aPerson.getName()
             .map(name -> name.toUpperCase()).orElse("NULL_VALUE")
        )
    )
    .doOnNext(aName -> System.out.println("aName: " + aName))
    .observeOn(Schedulers.io())
    .blockingSubscribe(new Observer<String>() {
        @Override
        public void onComplete() {
            System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable ex) {
            ex.printStackTrace();
        }

        @Override
        public void onNext(String item) {
            System.out.println("onNextFromObserver: " + item);
        }

        @Override
        public void onSubscribe(Disposable disposable) {
        }
    });
}

关于java-8 - 如何使用 Observables 迭代列表中的所有项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53125463/

相关文章:

java - 在 react 流中哪里放置参数验证?

java - 如何使用 RxJava 加载相关对象

Java 8 流 |文本文件中行的平均字数

angular - 订阅一个 AngularFireObject

node.js - 如何处理级联可观察值

javascript - 如何避免使用 Object.assign?

java - 如何在RxJava中实现链锁

java - 使用流计算无穷和

filter - Map 上的流和过滤操作

java - 如何创建正则表达式匹配流?