java - RxJava2 Flowable 不调用 Subscriber 中的 onNext 方法

标签 java android rx-java rx-java2

我是 rxjava 的新手,我对 Flowable 有疑问。 当 onComplete 方法在 flatMap 中被调用时,订阅者中的 onNext 方法没有被调用。这里有什么问题?有什么建议吗?

        ResourceSubscriber<List<Song>> subscriber = new ResourceSubscriber<List<Song>>() {
        @Override
        public void onStart() {
            Log.e(TAG, "onStart");
            mView.showProgress();
        }

        @Override
        public void onNext(List<Song> songs) {
            mView.onLocalMusicLoaded(songs);
            mView.emptyView(songs.isEmpty());
            Log.e(TAG, "onNext");
            //Not call here after onComplete called
        }

        @Override
        public void onError(Throwable t) {
            mView.hideProgress();
            Log.e(TAG, "onError: ", t);
        }

        @Override
        public void onComplete() {
            mView.hideProgress();
            Log.e(TAG, "onComplete");
        }
    };


    Disposable disposable = Flowable.just(cursor)
            .flatMap(new Function<Cursor, Publisher<List<Song>>>() {
                @Override
                public Publisher<List<Song>> apply(@NonNull Cursor cursor) throws Exception {
                    final List<Song> songs = new ArrayList<>();
                    if (cursor != null && cursor.getCount() > 0) {
                        cursor.moveToFirst();
                        do {
                            Song song = cursorToMusic(cursor);
                            songs.add(song);
                        } while (cursor.moveToNext());
                    }
                    return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
                        @Override
                        public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
                            e.onNext(songs);
                            e.onComplete();
                        }
                    }, BackpressureStrategy.LATEST);
                }
            })
            .doOnNext(new Consumer<List<Song>>() {
                @Override
                public void accept(@NonNull List<Song> songs) throws Exception {
                    Log.e(TAG, "onLoadFinished doOnNext: " + songs.size());
                    Collections.sort(songs, new Comparator<Song>() {
                        @Override
                        public int compare(Song left, Song right) {
                            return left.getDisplayName().compareTo(right.getDisplayName());
                        }
                    });
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(subscriber);

    mCompositeDisposable.add(disposable);

最佳答案

来自 ResourceSubscriber 的 Javadoc :

The default onStart() requests Long.MAX_VALUE by default. Override the method to request a custom positive amount. Use the protected request(long) to request more items and dispose() to cancel the sequence from within an onNext implementation.

    @Override
    public void onStart() {
        Log.e(TAG, "onStart");
        mView.showProgress();
        request(Long.MAX_VALUE);
    }

您还成功找到了Flowable.just 运算符,为什么要通过Flowable.create 复制它来发出标量sogs 列表?

关于java - RxJava2 Flowable 不调用 Subscriber 中的 onNext 方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44923365/

相关文章:

java - rxjava 间隔与另一个可观察值相结合

java - FirebaseInstanceIdService 已弃用,我在重写代码时迷失了方向

java - 为交错 GridView 动态设置固定高度

kotlin - 如何使用 Observable.zip 保存类型?

java - 保存到内存 Android

android - Android 中的 TextWatcher 警告和慢速类型

android - RxJava - 给出异常后继续连接

java - GXT 3 使用 ListStore 显示集合的每个值一行

java - 无法为对象堆保留足够的空间

java - 如何避免JAVA中重复解析XML特殊字符