java - 聚合已完成的线程并在超时后发送响应 rX Java

标签 java multithreading rx-java

我有一个用例,我需要聚合来自多个 Observable 对象的已完成线程响应并返回给客户端。我的问题是如何使用 rX Java 来实现它。这里我写了一个代码片段,但这个代码片段的问题是超时后不会返回任何内容。

Observable<AggregateResponse> aggregateResponse = Observable.
   zip(callServiceA(endpoint), callServiceB(endpoint), callServiceC(endpoint),
        (Mashup resultA, Mashup resultB, Mashup resultC) -> {
            AggregateResponse result = new AggregateResponse();
            result.setResult(resultA.getName() + " " + resultB.getName() + " " + resultC.getName());
            return result;
        }).timeout(5, TimeUnit.SECONDS);

订阅者

aggregateResponse.subscribe(new Subscriber<AggregateResponse>() {
        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable throwable) {
            //Timeout execute this rather than aggregating the finished tasks
            System.out.println(throwable.getMessage());
            System.out.println(throwable.getClass());
        }

        @Override
        public void onNext(AggregateResponse response) {
            asyncResponse.resume(response);
        }
    });

最佳答案

你需要在每个Observable上放置timeout运算符,zip将等待所有Observables发出一个值,然后再发出结果,因此,如果只有其中一个需要更长的时间,而其他的已经发出,则在压缩的 Observable 有机会发出之前,您将使用 timeout (使用 onError)来减少流.

假设您想忽略超时源,同时保留其余部分,您应该做的是向每个 Observable 添加超时运算符,并添加错误处理,例如 onErrorReturn对于每一个,错误返回都可以返回某种“空”结果(在 RxJava2 中不能使用 null),并且当聚合结果时忽略这些空结果:

Observable<AggregateResponse> aggregateResponse = Observable.
            zip(callServiceA(endpoint)
                            .timeout(5, TimeUnit.SECONDS)
                            .onErrorReturn(throwable -> new Mashup()),
                    callServiceB(endpoint)
                            .timeout(5, TimeUnit.SECONDS)
                            .onErrorReturn(throwable -> new Mashup()),
                    callServiceC(endpoint)
                            .timeout(5, TimeUnit.SECONDS)
                            .onErrorReturn(throwable -> new Mashup()),
                    (Mashup resultA, Mashup resultB, Mashup resultC) -> {
                        AggregateResponse result = new AggregateResponse();
                        result.setResult(resultA.getName() + " " + resultB.getName() + " " + resultC.getName());
                        return result;
                    });

关于java - 聚合已完成的线程并在超时后发送响应 rX Java,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45604378/

相关文章:

java - 初始化 ArrayList 的编码实践

c# - 我如何确保任何线程都不会无限期地等待?

java - 如何通知 Android 线程执行方法?

c# - 如何在工作服务器上实现类似IIS的线程池

java - 间隔调度的 RxJava observables 花费的时间比指定的要多

java - RxJava : return random objects from Observable correctly

java - 将外部文件(非 jar)包含到可执行 jar

java - 我的选择查询在使用 MySql 数据库的 Spring MVC 3 中不起作用

java - 在我的响应中获取 302 重定向的 HttpException

java - 如何扩展 PrintWriter?