java - 是否有更干净的方法使用 RxJava 来适应标准观察者/监听器服务?

标签 java rx-java rx-java2

我一直在尝试使用 RxJava 包装旧式监听器接口(interface)。我想出的似乎可行,但是 Observable.using 的使用感觉有点尴尬。

要求是:

  1. 每个 ID 只能订阅一次底层服务。
  2. 应缓存给定 ID 的最新值并将其提供给新订阅者。
  3. 如果没有任何东西在监听 ID,我们必须取消订阅底层服务。

还有更好的办法吗?以下是我得到的。

static class MockServiceRXAdapterImpl1 implements MockServiceRXAdapter {
    PublishSubject<MockResponse> mockResponseObservable = PublishSubject.create();
    MockService mockService = new MockService(mockResponse -> mockResponseObservable.onNext(mockResponse));
    final ConcurrentMap<String, Observable<String>> subscriptionMap = new ConcurrentHashMap<>();

    public Observable<String> getObservable(String id) {
        return Observable.using(() -> subscriptionMap.computeIfAbsent(
                id,
                key -> mockResponseObservable.filter(mockResponse -> mockResponse.id.equals(id))
                        .doOnSubscribe(disposable -> mockService.subscribe(id))
                        .doOnDispose(() -> {
                            mockService.unsubscribe(id);
                            subscriptionMap.remove(id);
                        })
                        .map(mockResponse -> mockResponse.value)
                        .replay(1)
                        .refCount()),
                observable -> observable,
                observable -> {
                }
        );
    }
}

最佳答案

您可以使用Observable.create

所以代码可能看起来像这样

final Map<String, Observable<String>> subscriptionMap = new HashMap<>();
MockService mockService = new MockService();

public Observable<String> getObservable(String id) {
    log.info("looking for root observable");
    if (subscriptionMap.containsKey(id)) {
        log.info("found root observable");
        return subscriptionMap.get(id);
    } else {
        synchronized (subscriptionMap) {
            if (!subscriptionMap.containsKey(id)) {
                log.info("creating new root observable");
                final Observable<String> responseObservable = Observable.create(emitter -> {
                    MockServiceListener listener = emitter::onNext;
                    mockService.addListener(listener);
                    emitter.setCancellable(() -> {
                        mockServices.removeListener(listener);
                        mockService.unsubscribe(id);
                        synchronized (subscriptionMap) {
                            subscriptionMap.remove(id);
                        }
                    });
                    mockService.subscribe(id);
                })
                        .filter(mockResponse -> mockResponse.id.equals(id))
                        .map(mockResponse -> mockResponse.value)
                        .replay(1)
                        .refCount();

                subscriptionMap.put(id, responseObservable);
            } else {
                log.info("Another thread created the observable for us");
            }
            return subscriptionMap.get(id);
        }
    }
}

关于java - 是否有更干净的方法使用 RxJava 来适应标准观察者/监听器服务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61114945/

相关文章:

android - 如何进行两次 Retrofit 调用并合并结果?

java - RxJava 仅延迟部分流程的重复

rx-java - RxJava2,Observable/Flowable 的 2 个订阅者,但 onNext 在任何一个上被调用

java - inputStream读取方法不断读取0

java - 'Contribution' 附近的语法不正确

java - RxJava - 按顺序上传文件 - 在调用 onNext 时发出下一个项目

rx-java - 缓存最后发出的项目 RxJava Operator

java.lang.NoClassDefFoundError : io/reactivex/subjects/Subject when building jar in Intellij Idea (gradle + JavaFX 11)

java - 使用 Java 读取多个属性 CouchDB

java - 将结果集转换为字符串数组