java - 订阅后合并PublishSubject

标签 java rx-java

这是我想要实现的目标:

PublishSubject<Integer> subject = PublishSubject.create();
    subject.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            System.out.println("item = " + integer);
        }
    });
    subject.mergeWith(Observable.just(1, 2));
    subject.onNext(3);
    /* 
    expected: 
    item = 1
    item = 2
    item = 3
    but received :
    item = 3
    */

我知道我可以做这样的事情:

PublishSubject.merge(subject, Observable.just(1,2)).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            System.out.println("item = " + integer); // emits 1 2 3
        }
    });

但问题是用户已经订阅了该主题。 我找不到一种优雅的方式。

编辑: 由于主体既是观察者又是订阅者,你可以这样做:

final PublishSubject<Integer> subject = PublishSubject.create();
    subject.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            System.out.println("item = " + integer);
        }
    });
    Observable.just(1,2).subscribe(subject);
    subject.onNext(3);//subscription
    /*
    expected:
    item = 1
    item = 2
    item = 3
    but received :
    item = 1
    item = 2
    */

最佳答案

将代码改为

    Subject<Integer> subject = PublishSubject.create();
    subject.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            System.out.println("item = " + integer);
        }
    });
    Observable.just(1,2).doOnNext(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer e) throws Exception {
            subject.onNext(e);
        }
    }).subscribe();

    subject.onNext(3);

但仍然如此。您需要订阅此 Observable。

关于java - 订阅后合并PublishSubject,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43249387/

相关文章:

android - 使用 RxJava 从 2 个可观察对象中获取一个结果

java - 由 PaintComponent 写入的 JPasswordField 文本颜色

java - 从字符串中提取日期

java - 来自资源的大图像获取异常

java - 在 RxJava 中超时取消任务

rx-java - 链接具有不同发射类型的多个可观察量

java - Java是建立店铺租赁服务的好语言吗?

使用正则表达式读取和转换Java文件

java - 如何在Retrofit2/rxJava中刷新ACCESS-TOKEN

java - 在 RxJava 中,RxJavaPlugins.setErrorHandler 和 Subscribe onError 有什么区别?