java - 等待并行 RX 订阅者完成

标签 java reactive-programming rx-java

我正在寻找在 rx-java 中等待异步任务完成的最佳方法。

作为一个常见的例子,有一个函数从本地商店获取 ID 列表,然后查询远程系统以获取这些 ID,然后将远程系统结果合并到一个报告中并返回给调用者功能。由于对远程系统的调用很慢,我们希望它们以异步方式完成,我只想在所有调用都已返回且结果已处理后返回。

我发现执行此操作的唯一可靠方法是轮询订阅以检查它是否已取消订阅。但我认为这似乎不是做事的“RX”方式!

作为示例,我使用了 http://howrobotswork.wordpress.com/2013/10/28/using-rxjava-in-android/ 中的示例并对其进行了轻微修改,使其成为非安卓系统并展示我的意思。我必须在 main() 方法中添加以下代码才能立即停止它退出。

while (!subscription.isUnsubscribed()) {
    Thread.sleep(100);
}

下面列出了该示例的完整代码(如果您尝试编译它,它取决于 http://square.github.io/retrofit/)

package org.examples;

import retrofit.RestAdapter;
import retrofit.http.GET;
import retrofit.http.Query;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

public class AsyncExample {

    public static void main(String[] args) throws InterruptedException {
        final Subscription subscription = Observable.from("London", "Paris", "Berlin")
                .flatMap(new Func1<String, Observable<WeatherData>>() {
                    @Override
                    public Observable<WeatherData> call(String s) {
                        return ApiManager.getWeatherData(s);
                    }
                })
                .subscribe(
                        new Action1<WeatherData>() {
                            @Override
                            public void call(WeatherData weatherData) {
                                System.out.println(Thread.currentThread().getName() + " - " + weatherData.name + ", " + weatherData.base);
                            }
                        },
                        new Action1<Throwable>() {
                            @Override
                            public void call(Throwable throwable) {
                                System.out.println(Thread.currentThread().getName() + " - ERROR: " + throwable.getMessage());
                            }
                        },
                        new Action0() {
                            @Override
                            public void call() {
                                System.out.println(Thread.currentThread().getName() + " - COMPLETED");
                            }
                        }
                );

        // Have to poll subscription to check if its finished - is this the only way to do it?
        while (!subscription.isUnsubscribed()) {
            Thread.sleep(100);
        }
    }
}

class ApiManager {

    private interface ApiManagerService {
        @GET("/weather")
        WeatherData getWeather(@Query("q") String place, @Query("units") String units);
    }

    private static final RestAdapter restAdapter = new RestAdapter.Builder()
            .setEndpoint("http://api.openweathermap.org/data/2.5")
            .build();
    private static final ApiManagerService apiManager = restAdapter.create(ApiManagerService.class);

    public static Observable<WeatherData> getWeatherData(final String city) {
        return Observable.create(new Observable.OnSubscribe<WeatherData>() {
            @Override
            public void call(Subscriber<? super WeatherData> subscriber) {
                try {
                    System.out.println(Thread.currentThread() + " - Getting " + city);
                    subscriber.onNext(apiManager.getWeather(city, "metric"));
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        }).subscribeOn(Schedulers.io());
    }
}

最佳答案

如果您询问如何将异步 Observable 转换为同步 Observable,您可以使用 .toBlocking() 然后使用 .last() 来等待 Observable 完成。例如,以下将不会继续,直到计时器完成甚至尽管计时器在计算线程上执行。

try {
    Observable
            .timer(10, TimeUnit.SECONDS)
            .toBlocking()
            .last();    // Wait for observable to complete. Last item discarded.
} catch (IllegalArgumentException ex) {
    // No items or error.
}

除非绝对必要,否则您可能不应该使用此方法。通常,应用程序终止将由一些其他事件(按键、关闭菜单项单击等)控制。您的网络请求 Observables 将异步完成,您将在 onNext()、onCompleted() 和 onError() 调用中采取行动更新 UI 或显示错误。

此外,Retrofit 的美妙之处在于它内置了对 RxJava 的支持,并将在后台执行网络请求。要使用该支持,请将您的接口(interface)声明为返回一个 Observable。

interface ApiManagerService {
    @GET("/weather")
    Observable<WeatherData> getWeather(@Query("q") String place, @Query("units") String units);
}

这允许您将 getWeatherData() 方法简化为:

public static Observable<WeatherData> getWeatherData(final String city) {
    return apiManager.getWeather(city, "metric");
}

关于java - 等待并行 RX 订阅者完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25163262/

相关文章:

java - SolrHealthIndicator 不带已弃用的 CompositeHealthIndicator

java - 获取哈希表上一对整数(键值)的值

java - 无法获取 ExtendedState JFrame 的大小

javascript - 如何从 rxjs Observable 获取刻度数

java - 如何在 RxJava Vert.x 中结束链式 http 请求?

android - 验证 rxjava 订阅者中的交互

java - Flux.take(x) 或 Flux.all(..) 不会等待所有排放

java - 带有mockito和多个返回值的模拟CSV阅读器

java - Project Reactor - 如何按窗口处理结果

android - 在 RxJava 中将 Singles 链接在一起