我正在使用reactive-location库。
我的用例是我有一个从可观察对象发出的对象流。这些元素可能每隔几个小时就会排放一次。一旦发出一个项目,我想获取一个位置并使用 zipWith (据我所知)发出一个包含该位置的对象。
问题是:由于对象每隔几个小时才会发射一次,因此我无法保持可观察到的位置很热,因为这会耗尽电池。
所以我需要以下内容:一旦将对象传递到流中,就订阅可观察的位置,一旦获取位置,取消订阅可观察的位置。这必须持续进行。
据我了解,这个转换器负责取消订阅
public <T> Observable.Transformer<T, T> takeNextAndUnsubscribe() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
final BehaviorSubject subject = BehaviorSubject.create();
Observable source = tObservable.doOnNext(new Action1<T>() {
@Override
public void call(T t) {
subject.onNext(t);
}
});
return Observable
.merge(source.takeUntil(subject), subject)
.take(1);
}
};
}
但是一旦新对象被发送到流中,我如何再次订阅?
最佳答案
听起来您需要的是在发出源项目时将其与当前位置组合起来。这里不需要任何花哨的东西。只需在每个源项目上使用 flatMap()
即可将其与位置结合起来。
source.flatMap(item ->
locationProvider
.getLastKnownLocation()
.map(location -> new ItemWithLocation<>(item, location))
);
class ItemWithLocation<T> {
private final T item;
private final Location location;
public ItemWithLocation(T item, Location location) {
this.item = item;
this.location = location;
}
public T getItem() {
return item;
}
public Location getLocation() {
return location;
}
}
编辑:更新了第二个示例。以下内容将订阅位置更新,直到达到特定的精度,然后将其与您的源项目合并。这里的关键是first()
的使用。每当您获得满足您需求的位置时,使用它就会取消对位置提供商的订阅。
LocationRequest request =
LocationRequest
.create()
.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
.setInterval(100);
source.flatMap(item ->
locationProvider
.getUpdatedLocation(request)
.first(location -> location.getAccuracy() < 5.0f)
.map(location -> new ItemWithLocation<>(item, location))
);
关于java - 订阅和取消订阅每个发出的项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33289081/