java - 订阅和取消订阅每个发出的项目

标签 java android reactive-programming rx-java rx-android

我正在使用reactive-location库。

我的用例是我有一个从可观察对象发出的对象流。这些元素可能每隔几个小时就会排放一次。一旦发出一个项目,我想获取一个位置并使用 zipWith (据我所知)发出一个包含该位置的对象。

问题是:由于对象每隔几个小时才会发射一次,因此我无法保持可观察到的位置很热,因为这会耗尽电池。

所以我需要以下内容:一旦将对象传递到流中,就订阅可观察的位置,一旦获取位置,取消订阅可观察的位置。这必须持续进行。

据我了解,这个转换器负责取消订阅

public <T> Observable.Transformer<T, T> takeNextAndUnsubscribe() {
    return new Observable.Transformer<T, T>() {

        @Override
        public Observable<T> call(Observable<T> tObservable) {
            final BehaviorSubject subject = BehaviorSubject.create();
            Observable source = tObservable.doOnNext(new Action1<T>() {
                @Override
                public void call(T t) {
                    subject.onNext(t);
                }
            });
            return Observable
                    .merge(source.takeUntil(subject), subject)
                    .take(1);
        }

    };
}

但是一旦新对象被发送到流中,我如何再次订阅?

最佳答案

听起来您需要的是在发出源项目时将其与当前位置组合起来。这里不需要任何花哨的东西。只需在每个源项目上使用 flatMap() 即可将其与位置结合起来。

source.flatMap(item ->
        locationProvider
                .getLastKnownLocation()
                .map(location -> new ItemWithLocation<>(item, location))
);

class ItemWithLocation<T> {
    private final T item;
    private final Location location;

    public ItemWithLocation(T item, Location location) {
        this.item = item;
        this.location = location;
    }

    public T getItem() {
        return item;
    }

    public Location getLocation() {
        return location;
    }
}

编辑:更新了第二个示例。以下内容将订阅位置更新,直到达到特定的精度,然后将其与您的源项目合并。这里的关键是first()的使用。每当您获得满足您需求的位置时,使用它就会取消对位置提供商的订阅。

LocationRequest request = 
        LocationRequest
            .create()
            .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
            .setInterval(100);

source.flatMap(item ->
        locationProvider
                .getUpdatedLocation(request)
                .first(location -> location.getAccuracy() < 5.0f)
                .map(location -> new ItemWithLocation<>(item, location))
);

关于java - 订阅和取消订阅每个发出的项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33289081/

相关文章:

java - 对卡在 CLOSE_WAIT 状态的连接进行故障排除

java - 允许自动查询的网络搜索 API

java - Spring WebClient 下载图像

java - 如何从项目中删除jar文件?

java - 为什么 Android map 指南针演示使用 SmoothCanvas 类的委托(delegate)?

java - 如何使用 gradle 构建 java hidl 客户端

android - 为什么Project的build.gradle没有显示?

android - 使用自定义流布局时,点击事件不会在所有设备上传播

javascript - 使用 RxJS 合并未知数量的可观察对象

android - 函数不在后台线程上执行