java - 如何使用 ReplaySubject 重新运行 CompositeSubscription?

标签 java android rx-java rx-android

我有 CompositeSubscription ,并在那里添加带有 ReplaySubject 的 Subscription

 CompositeSubscription compositeSubscription = new CompositeSubscription();
    ReplaySubject subject = ReplaySubject.create();

 compositeSubscription.add(
                manager.getAllContacts()
                .toList()
                .doOnNext(new Action1<List<Person>>() {
                    @Override
                    public void call(List<Person> persons) {
                        allPersons = persons;
                        Log.e(TAG, "BookContacts: " + "allPersons = " + allPersons.size());
                        setupViewPager();
                    }
                })
                .subscribe(subject));

然后我用这个 ReplaySubject 添加第二个订阅

compositeSubscription.add(Observable.combineLatest(subject,
                                                           (PublishSubject<List<CustomUser>>) execute(
                                                                   manager.getDigitsContacts()),
                                                           new Func2<List<Person>, List<CustomUser>, Object>() {
                                                               @Override
                                                               public Object call(List<Person> persons, List<CustomUser> customUsers) {
                                                                  //... my code with persons and customUsers...
                                                                   return null;
                                                               }
                                                           })
                                            .subscribe());

代码正常运行,在完成 ReplaySubject hasCompleted = true 之后。

但是当我尝试添加第三个 Subscription 时,它不会调用“call()”方法

compositeSubscription.add(Observable.combineLatest(subject,
                                                           (PublishSubject<List<CustomUser>>) execute(
                                                                   manager.getFacebookContacts()),  //<-----manager.getFacebookContacts() is run, but doesn't call call() method
                                                           new Func2<List<Person>, List<CustomUser>, Object>() {
                                                               @Override
                                                               public Object call(List<Person> persons, List<CustomUser> customUsers) {
                                                                  //...this method is not called after  "manager.getFacebookContacts()"
                                                                   return null;
                                                               }
                                                           })
                                            .subscribeOn(Schedulers.newThread())
                                            .subscribe());

如何解决?...因为如果我同时添加订阅,它就可以正常工作。

最佳答案

能否请您将错误回调添加到.subscribe()?我的猜测是第三次 ReplaySubject 溢出了 combineLatest 的缓冲区。您应该使用 .replay().autoConnect(0)

而不是创建主题

CompositeSubscription compositeSubscription = new CompositeSubscription();

Observable<List<Person>> persons = manager.getAllContacts()
    .toList()
    .doOnNext(new Action1<List<Person>>() {
        @Override
        public void call(List<Person> persons) {
            allPersons = persons;
            Log.e(TAG, "BookContacts: " + "allPersons = " + allPersons.size());
            setupViewPager();
        }
    }).replay().autoConnect(0, s -> compositeSubscription.add(s));

然后使用persons代替subject

关于java - 如何使用 ReplaySubject 重新运行 CompositeSubscription?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33611142/

相关文章:

android - 如何配置CountDownTimer

java - android 5.0.1 sdk 包适用于 android 5.0.4 吗?

android - 如何从 rxjava flatmap 调用协程用例

rx-java - RxSwift - .subscribe 与 .subscribeNext 有什么区别?

java - 使顺序处理异步,而不使用 "parallel"运算符

java - AWS Elastic Beanstalk : access System variables

java - 如何从 Tomcat/Java 应用程序创建文件名带有 UTF-8 字符的系统文件?

java - SocketChannel 套接字不读取数据

java - 形态学函数瓶颈的 OpenCV Android 程序

java - 编译错误: variable may not have been initialized