我一直在尝试使用 RxJava 包装旧式监听器接口(interface)。我想出的似乎可行,但是 Observable.using 的使用感觉有点尴尬。
要求是:
- 每个 ID 只能订阅一次底层服务。
- 应缓存给定 ID 的最新值并将其提供给新订阅者。
- 如果没有任何东西在监听 ID,我们必须取消订阅底层服务。
还有更好的办法吗?以下是我得到的。
static class MockServiceRXAdapterImpl1 implements MockServiceRXAdapter {
PublishSubject<MockResponse> mockResponseObservable = PublishSubject.create();
MockService mockService = new MockService(mockResponse -> mockResponseObservable.onNext(mockResponse));
final ConcurrentMap<String, Observable<String>> subscriptionMap = new ConcurrentHashMap<>();
public Observable<String> getObservable(String id) {
return Observable.using(() -> subscriptionMap.computeIfAbsent(
id,
key -> mockResponseObservable.filter(mockResponse -> mockResponse.id.equals(id))
.doOnSubscribe(disposable -> mockService.subscribe(id))
.doOnDispose(() -> {
mockService.unsubscribe(id);
subscriptionMap.remove(id);
})
.map(mockResponse -> mockResponse.value)
.replay(1)
.refCount()),
observable -> observable,
observable -> {
}
);
}
}
最佳答案
您可以使用Observable.create
所以代码可能看起来像这样
final Map<String, Observable<String>> subscriptionMap = new HashMap<>();
MockService mockService = new MockService();
public Observable<String> getObservable(String id) {
log.info("looking for root observable");
if (subscriptionMap.containsKey(id)) {
log.info("found root observable");
return subscriptionMap.get(id);
} else {
synchronized (subscriptionMap) {
if (!subscriptionMap.containsKey(id)) {
log.info("creating new root observable");
final Observable<String> responseObservable = Observable.create(emitter -> {
MockServiceListener listener = emitter::onNext;
mockService.addListener(listener);
emitter.setCancellable(() -> {
mockServices.removeListener(listener);
mockService.unsubscribe(id);
synchronized (subscriptionMap) {
subscriptionMap.remove(id);
}
});
mockService.subscribe(id);
})
.filter(mockResponse -> mockResponse.id.equals(id))
.map(mockResponse -> mockResponse.value)
.replay(1)
.refCount();
subscriptionMap.put(id, responseObservable);
} else {
log.info("Another thread created the observable for us");
}
return subscriptionMap.get(id);
}
}
}
关于java - 是否有更干净的方法使用 RxJava 来适应标准观察者/监听器服务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61114945/