java - 如何在 RxJava 中让多个订阅者订阅一个分组的 Observable?

标签 java system.reactive rx-java

这是使用 RxJava 版本 0.19.6

在 groupBy 操作之外,可以创建一个由以下代码描述的管道,例如,根据某些条件从 Observable 中选择一条记录,或者选择满足某些替代条件的第一条记录:

Observable<Long> observable = Observable.interval(1, TimeUnit.MILLISECONDS).take(10);
Observable<Long> filter1 = observable.filter(new Func1<Long, Boolean>() {
    @Override
    public Boolean call(Long aLong) {
        return 5 == aLong % 5;
    }
});
Observable<Long> filter2 = observable.filter(new Func1<Long, Boolean>() {
    @Override
    public Boolean call(Long aLong) {
        return 2 == aLong % 5;
    }
});
BlockingObservable.from(Observable.concat(filter1, filter2).first()).forEach(new Action1<Long>() {
    @Override
    public void call(Long aLong) {
        System.out.println(aLong);
    }
});

...不幸的是,由于对 GroupedObservable 的限制,同类过程似乎无法在分组上下文中运行:

BlockingObservable.from(observable.groupBy(new Func1<Long, Long>() {
    @Override
    public Long call(Long aLong) {
        return aLong % 5;
    }
}).flatMap(new Func1<GroupedObservable<Long, Long>, Observable<Long>>() {
    @Override
    public Observable<Long> call(GroupedObservable<Long, Long> in) {
        Observable<Long> filter1 = in.filter(new Func1<Long, Boolean>() {
            @Override
            public Boolean call(Long aLong) {
                return 5 == aLong % 5;
            }
        });
        Observable<Long> filter2 = in.filter(new Func1<Long, Boolean>() {
            @Override
            public Boolean call(Long aLong) {
                return 2 == aLong % 5;
            }
        });
        return Observable.concat(filter1, filter2).first();
    }
})).forEach(new Action1<Long>() {
    @Override
    public void call(Long aLong) {
        System.out.println(aLong);
    }
});

...导致多订阅者异常(线程“主”java.lang.IllegalStateException 中的异常:只允许一个订阅者!)。

我是否遗漏了一些明显的解决方法?在这种情况下,我曾尝试使用 ConnectableObservables 来呈现单个订阅者的外观,但这些尝试也都失败了(肯定是由于我的无知)。


在相关说明中,groupByUntil 似乎也为您提供了对 GroupedObservable 的引用,如果我实际上尝试使用它来确定何时关闭窗口,这让我很头疼地提示多个订阅者。在这里,我再次确信我忽略了一些明显的东西,因为 API 显然希望人们使用 GroupedObservable!

最佳答案

您可以在 GroupedObservable 上使用 .cache()。

final Observable<Long> inCached = in.cache();

然后在您的过滤器中使用生成的 Observable。

Observable<Long> filter1 = inCached.filter(new Func1<Long, Boolean>() {
        @Override
        public Boolean call(Long aLong) {
            return 5 == aLong % 5;
        }
    });

这样每个订阅者都会看到相同的项目,但 GroupedObservable 只会有一个订阅者。

关于java - 如何在 RxJava 中让多个订阅者订阅一个分组的 Observable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25374403/

相关文章:

java - Groovy脚本中创建类遇到错误

java - ADF 应用程序部署两次以运行

.net - 当满足某个值或条件时,如何触发 Observable 数据流的缓冲?

java - Spring Boot 响应实体不返回 JSON

java - Android 上的 Proguard 和使用 Json

c# - Rx observable,如果某个超时到期,它会发布一个值

c# - 基于推和基于拉的结构(如 IEnumerable<T> 和 IObservable<T>)之间有什么区别

android - RXJava - 缓冲 observable 1 直到 observable 2 发出一个项目

java - RxJava-我什么时候应该关注取消订阅?

android - 如何在最后一个订阅者退订后延迟拆除共享的、无限的 Observables