android - RxJava Observable.create 包装可观察订阅

标签 android rx-java kotlin rx-java2

我使用 Observable.create 以便在某些数据可用时通知订阅者。我有点不确定在我的 create 方法中订阅 observables。这些嵌套订阅会给我带来什么问题吗?我并不完全熟悉使用 Observable.create 创建可观察对象,所以我想确保我没有做任何不寻常的事情或滥用它。提前致谢!

abstract class NetworkResource<ApiType, DbType> constructor(private val schedulerProvider: SchedulerProvider) {

    abstract fun fetchFromApi(): Single<ApiType>
    abstract fun fetchFromDb(): Observable<Optional<DbType>>
    abstract fun saveToDb(apiType: ApiType?)
    abstract fun shouldFetchFromApi(cache: DbType?): Boolean

    fun fetch(): Observable<Optional<DbType>>  {
        return Observable.create<Optional<DbType>> {
            val subscriber = it

            fetchFromDb()
                    .subscribe({
                        subscriber.onNext(it)

                        if(shouldFetchFromApi(it.get())) {
                            fetchFromApi()
                                    .observeOn(schedulerProvider.io())
                                    .map {
                                        saveToDb(it)
                                        it
                                    }
                                    .observeOn(schedulerProvider.ui())
                                    .flatMapObservable {
                                        fetchFromDb()
                                    }
                                    .subscribe({
                                        subscriber.onNext(it)
                                        subscriber.onComplete()
                                    })
                        }
                        else {
                            subscriber.onComplete()
                        }
                    })

        }
    }
}

最佳答案

是的,这会导致问题。

首先,像这样嵌套 Observable 不是惯用的,Reactive 方法的优势之一是组合 Observables,因此只有一个干净的流。通过这种方式,你打破了链条,直接的结果是交织在一起的代码更难阅读,更多的代码来连接通知事件,基本上就像用 Observable 包装异步回调方法.
在这里,因为您已经有了响应式(Reactive)组件,所以您可以简单地组合它们,而不是使用回调方法来处理它们。

其次,作为断开链的结果,最严重和最直接的一个 - 取消订阅外部 Observable 不会自动影响内部 Observable。这同样适用于尝试添加 subscribeOn() 并且在背压很重要的不同场景中它也适用。

作曲的替代方案可能是这样的:

fun fetch2(): Observable<Optional<DbType>> {
        return fetchFromDb()
                .flatMap {
                    if (shouldFetchFromApi(it.get())) {
                        fetchFromApi()
                                .observeOn(schedulerProvider.io())
                                .doOnSuccess { saveToDb(it) }
                                .observeOn(schedulerProvider.ui())
                                .flatMapObservable {
                                    fetchFromDb()
                                }

                    } else {
                        Observable.empty()
                    }
                }
    }

如果出于某种原因,您希望在任何情况下单独发出第一个 fetchFromDb() 结果,您也可以使用带有选择器的 publish() 来实现:

 fun fetch2(): Observable<Optional<DbType>> {
    return fetchFromDb()
            .publish {
                Observable.merge(it,
                        it.flatMap {
                            if (shouldFetchFromApi(it.get())) {
                                fetchFromApi()
                                        .observeOn(schedulerProvider.io())
                                        .doOnSuccess { saveToDb(it) }
                                        .observeOn(schedulerProvider.ui())
                                        .flatMapObservable {
                                            fetchFromDb()
                                        }

                            } else {
                                Observable.empty()
                            }
                        })
            }

}

关于android - RxJava Observable.create 包装可观察订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44503112/

相关文章:

Android ViewModel 观察 Mu​​tableLiveData<ArrayList<String>>

android - Retrofit链如何调用?

android - Observable.retrywhen 中的异常类型

android - RxJava 取消 'Single.fromCallable' 请求,disposable.clear 导致 InterruptedIOException

android - Android checkSelfPermission()中的 "this"指的是什么?

winapi - 如何初始化 LPBOOL? Kotlin Native 中的 Win32 Api

java - 改造和 GRPC

java - 如何在后台线程中正确执行SQL查询?

string - 如何在 Kotlin 字符串模板中嵌入 for 循环

Android:GoogleIdTokenVerifier.Builder 中的传输和 jsonFactory 是什么?