这是我正在处理的一个有趣的 RxJava 小难题。假设我有一个 Observable<List<Parent>> infiniteParentListStream
这是无限的,并且每个 Parent
有一个 Observable<List<Child>> infiniteChildListStream
属性(property)也是无限的。
我想拿走全部 Parent
发出的实例 List<Parent>
,并合并每个发出的 List<Child>
项目变成一个整体List<Child>
反射(reflect)所有 parent 的所有 child 。
事实上Observable<List<Child>> infiniteChildListStream
属性(property) Parent
是无穷大正在制作 toList()
任务有点挑战性。
public final class NestedInfiniteTest {
private static final BehaviorSubject<Integer> parentSubject = BehaviorSubject.create(1);
private static final BehaviorSubject<Integer> childSubject = BehaviorSubject.create(1);
public static void main(String[] args) {
Observable<List<Parent>> infiniteParentListStream = parentSubject
.map(i -> Arrays.asList(new Parent(), new Parent(), new Parent()))
.cache(1);
Observable<List<Child>> allCurrentChildren = infiniteParentListStream.<List<Child>>flatMap(parentList ->
Observable.from(parentList)
.flatMap(p -> p.getInfiniteChildListStream().flatMap(Observable::from)).toList()
).cache(1);
allCurrentChildren.subscribe(cl -> System.out.println("WHOLE CHILD LIST SIZE: " + cl.size()));
}
private static final class Parent {
private final Observable<List<Child>> infiniteChildListStream = childSubject
.map(i -> Arrays.asList(new Child(), new Child(), new Child())).cache(1);
public Observable<List<Child>> getInfiniteChildListStream() {
return infiniteChildListStream;
}
}
private static final class Child {
}
}
当然,我发现的一个解决方案是将 infiniteChildListStream
通过调用 first()
来有限。但这不太理想,因为它不再更新。
Observable<List<Child>> allCurrentChildren = infiniteParentListStream.<List<Child>>flatMap(parentList ->
Observable.from(parentList)
.flatMap(p -> p.getInfiniteChildListStream().first().flatMap(Observable::from)).toList()
).cache(1);
我感觉有办法手动调用Observable.create()
或使用flatMap()
解决这个问题的技巧。有没有更好的方法来做到这一点并使事物与无限的源保持 react ?在我这个 SSCCE 之外的实际应用程序中,这些可观察量是无限的,因为驱动 Parent
的数据源和Child
可以改变并发出新的值...
我想我的问题的根源是如何取多个无限 Observable<List<T>>
并将它们合并为一个 Observable<List<T>>
?
最佳答案
我想我是通过使用Observable.combineLatest() 来解决这个问题的。为了增强测试,我还修改了源可观察量,以根据主题推送的整数值创建不同的 List
大小。这看起来效果很好。
public final class NestedInfiniteTest {
private static final BehaviorSubject<Integer> parentSubject = BehaviorSubject.create(1);
private static final BehaviorSubject<Integer> childSubject = BehaviorSubject.create(1);
public static void main(String[] args) {
Observable<List<Parent>> infiniteParentListStream = parentSubject
.map(i -> IntStream.range(0,i).mapToObj(val -> new Parent()).collect(Collectors.toList()))
.cache(1);
Observable<List<Child>> allCurrentChildren = infiniteParentListStream.flatMap(parentList ->
Observable.<Observable<List<Child>>>create(s -> {
parentList.stream().map(Parent::getInfiniteChildListStream).forEach(s::onNext);
s.onCompleted();
})
.toList() //List<<Observable<List<Child>>>>
.flatMap(consolidatedChildList -> Observable.combineLatest(consolidatedChildList, new FuncN<List<Child>>() {
@Override
public List<Child> call(Object... args) {
ArrayList<Child> list = new ArrayList<>();
for (Object obj : args) {
list.addAll((List<Child>) obj);
}
return list;
}
}))
);
allCurrentChildren.subscribe(cl -> System.out.println("WHOLE CHILD LIST SIZE: " + cl.size()));
childSubject.onNext(10);
parentSubject.onNext(5);
childSubject.onNext(2);
}
private static final class Parent {
private final Observable<List<Child>> infiniteChildListStream = childSubject
.map(i -> IntStream.range(0, i).mapToObj(val -> new Child()).collect(Collectors.toList())).cache(1);
public Observable<List<Child>> getInfiniteChildListStream() {
return infiniteChildListStream;
}
}
private static final class Child {
}
}
输出:
WHOLE CHILD LIST SIZE: 1 //parentSubject = 1, childSubject = 1
WHOLE CHILD LIST SIZE: 10 //parentSubject = 1, childSubject = 10
WHOLE CHILD LIST SIZE: 50 //parentSubject = 5, childSubject = 10
WHOLE CHILD LIST SIZE: 2 //parentSubject = 5, childSubject = 2, adjusting
WHOLE CHILD LIST SIZE: 42 //adjusting
WHOLE CHILD LIST SIZE: 34 //adjusting
WHOLE CHILD LIST SIZE: 26 //adjusting
WHOLE CHILD LIST SIZE: 18 //adjusting
WHOLE CHILD LIST SIZE: 10 //parentSubject = 5, childSubject = 2, done!
更新:创建了一个转换器来执行此任务
public static class CombinedListTransformer<T,R> implements Observable.Transformer<List<T>,List<R>> {
private final Func1<T,Observable<List<R>>> listMapper;
public CombinedListTransformer(Func1<T,Observable<List<R>>> listMapper) {
this.listMapper = listMapper;
}
@Override
public Observable<List<R>> call(Observable<List<T>> sourceList) {
return sourceList.flatMap(sl ->
Observable.from(sl).map(t -> listMapper.call(t)).toList() //List<Observable<List<R>>
.flatMap(consolidatedChildList -> Observable.combineLatest(consolidatedChildList, args -> {
ArrayList<R> list = new ArrayList<>();
for (Object obj : args) {
list.addAll((List<R>) obj);
}
return list;
}))
);
}
}
关于java - RxJava-合并多个、无限的 Observable<List<T>>?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32292509/