java - 将多个异步 Observable<List> 减少为一个 Observable<List>

标签 java multithreading rx-java reactive-programming rx-android

所以这就是我想做的:
对于每个事件,我想调用 Service.getList() 10 次,然后将 10 个列表合并为一个。

现在我尝试了这两种方法,它们都在 UT 中工作,但在实际应用程序中失败(我猜我在实际应用程序中给定异步 http 调用时没有正确执行归约操作)。 对于这两种情况,我都看不到 reduce() 之后的日志,onNext()onError() 都看不到,所以我猜测 reduce() 操作未完成。

尝试#1:

public Observable<List<Event>> getEventsForLocation(Location location) {
List<Observable<List<Event>>> obs = new ArrayList<>();
for (Venue v : location.getVenues()) {
     obs.add(getEventsForVenue(v)); //does one http call, returns Observable<List<Event>> 
}
return Observable.concat(Observable.from(obs))
            .reduce((List<Event>) new ArrayList<Event>(), (events, events2) -> {
                events.addAll(events2);
                return events;
            })
            .doOnNext(events -> Log.d("reduce ", events.toString()))
            .doOnError(throwable -> Log.e("reduce error", throwable.toString()));}

尝试#2:

public Observable<List<Event>> getEventsForLocation(Location location) {
return Observable
        .from(location.getVenues())
        .flatMap(venue -> getEventsForVenue(venue)) //does one http call, returns Observable<List<Event>> 
        .reduce((List<Event>) new ArrayList<Event>(), (events, events2) -> {
            events.addAll(events2);
            return events;
        })
       .doOnNext(events -> Log.d("service", "total events " + events.toString()))
       .doOnError(t -> Log.e("service", "total events error2 " + t.toString()));}

以及两种方法的 UT:

    @Test
    public void getEventsForLocation() {
        Location loc = new Location("test", newArrayList(new Venue("v1", "url1"),new Venue("v2", "url2")));

        when(httpGateway.downloadWebPage(Mockito.anyString())).thenReturn(
                Observable.just(readResource("eventsForVenue1.html")),
                Observable.just(readResource("eventsForVenue2.html"))
        );

        TestSubscriber<List<Event>> probe = new TestSubscriber<>();
        service.getEventsForLocation(loc).subscribe(probe);

        probe.assertNoErrors();

        //assert the next event containts contents of all lists
        List<Event> events = probe.getOnNextEvents().get(0);

        //first list
        Assert.assertEquals("Unexpected title", "event1", events.get(0).getName());
        Assert.assertEquals("Unexpected artist", "artist1", events.get(0).getArtist());
        //second list
        Assert.assertEquals("Unexpected title", "event2", events.get(1).getName());
        Assert.assertEquals("Unexpected artist", "artist2", events.get(1).getArtist());
    }

更新

这里是更完整的代码,以及调度程序。

Observable
                .just(loc)
                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.io())
                .flatMap(location -> service.getEventsForLocation(location))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(getObserver();

最佳答案

如果你不关心列表中的最终顺序,你可以只使用 from + flatMap + flatMapIterable + toList:

Observable.from(location.getVenues())
.flatMap(venue -> getEventsForVenue(venue))
.flatMapIterable(list -> list)
.toList();

如果顺序很重要并且您想“并行”执行 getEventsForVenue,您可以用 concatMapEager 替换 flatMap:

Observable.from(location.getVenues())
.concatMapEager(venue -> getEventsForVenue(venue))
.concatMapIterable(list -> list)
.toList();

关于java - 将多个异步 Observable<List> 减少为一个 Observable<List>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36678121/

相关文章:

java - 使用 Spring MVC 和 JSP 进行响应分块 : how to write contents of JSP into response?

java - 高效的EnumSet + List

java - Css Selector 有特定的属性?

python - 如何在多线程中运行TensorRT?

c++ - 与 omp 并行

android - 如何订阅点击事件以便异常情况不会取消订阅?

java - 模块化 Spring 项目的架构

c++ - 在 `std::future` 中运行循环直到破坏 - 惯用方式?

java - 刷新 token rxjava+retrofir2

android - PublishSubject 与 Kotlin 协程(流程)