使用 RxJava 或 kotlin 协程的 Android ViewState

标签 android rx-java2 kotlinx.coroutines

我正在尝试学习如何在 Android 中使用 RxJava,但遇到了死胡同。我有以下数据源:

object DataSource {

    enum class FetchStyle {
        FETCH_SUCCESS,
        FETCH_EMPTY,
        FETCH_ERROR
    }

    var relay: BehaviorRelay<FetchStyle> = BehaviorRelay.createDefault(FetchStyle.FETCH_ERROR)

    fun fetchData(): Observable<DataModel> {
        return relay
            .map { f -> loadData(f) }
    }

    private fun loadData(f: FetchStyle): DataModel {
        Thread.sleep(5000)

        return when (f) {
            FetchStyle.FETCH_SUCCESS -> DataModel("Data Loaded")
            FetchStyle.FETCH_EMPTY -> DataModel(null)
            FetchStyle.FETCH_ERROR -> throw IllegalStateException("Error Fetching")
        }
    }
}

每当我更改 relay 的值时,我想触发下游更新,但这并没有发生。它在 Activity 初始化时起作用,但在我更新值时不起作用。这是我的 ViewModel,我从那里更新值:

class MainViewModel : ViewModel() {

    val fetcher: Observable<UiStateModel> = DataSource.fetchData().replay(1).autoConnect()
        .map { result -> UiStateModel.from(result) }
        .onErrorReturn { exception -> UiStateModel.Error(exception) }
        .startWith(UiStateModel.Loading())
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())

    fun loadSuccess() {
        DataSource.relay.accept(DataSource.FetchStyle.FETCH_SUCCESS)
    }

    fun loadEmpty() {
        DataSource.relay.accept(DataSource.FetchStyle.FETCH_EMPTY)
    }

    fun loadError() {
        DataSource.relay.accept(DataSource.FetchStyle.FETCH_ERROR)
    }
}

这是执行订阅的 Activity 中的代码:

model.fetcher
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({
                    uiState -> mainPresenter.loadView(uiState)
            })

最佳答案

最终改用 kotlin 协程,因为我无法重新订阅 ConnectableObservable 并开始新的获取。

这是任何感兴趣的人的代码。

主持人:

class MainPresenter(val view: MainView) {

    private lateinit var subscription: SubscriptionReceiveChannel<UiStateModel>

    fun loadSuccess(model: MainViewModel) {
        model.loadStyle(DataSource.FetchStyle.FETCH_SUCCESS)
    }

    fun loadError(model: MainViewModel) {
        model.loadStyle(DataSource.FetchStyle.FETCH_ERROR)
    }

    fun loadEmpty(model: MainViewModel) {
        model.loadStyle(DataSource.FetchStyle.FETCH_EMPTY)
    }

    suspend fun subscribe(model: MainViewModel) {
        subscription = model.connect()
        subscription.subscribe { loadView(it) }
    }

    private fun loadView(uiState: UiStateModel) {
        when(uiState) {
            is Loading -> view.isLoading()
            is Error -> view.isError(uiState.exception.localizedMessage)
            is Success -> when {
                uiState.result != null -> view.isSuccess(uiState.result)
                else -> view.isEmpty()
            }
        }
    }

    fun unSubscribe() {
        subscription.close()
    }
}

inline suspend fun <E> SubscriptionReceiveChannel<E>.subscribe(action: (E) -> Unit) = consumeEach { action(it) }

View :

...
override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        launch(UI) {
            mainPresenter.subscribe(model)
        }

        btn_load_success.setOnClickListener {
            mainPresenter.loadSuccess(model)
        }

        btn_load_error.setOnClickListener {
            mainPresenter.loadError(model)
        }

        btn_load_empty.setOnClickListener {
            mainPresenter.loadEmpty(model)
        }
    }

    override fun onDestroy() {
        super.onDestroy()
        Log.d("View", "onDestroy()")
        mainPresenter.unSubscribe()
    }
...

模型:

class MainViewModel : ViewModel() {

    val TAG = this.javaClass.simpleName

    private val stateChangeChannel = ConflatedBroadcastChannel<UiStateModel>()

    init {
        /** When the model is initialized we immediately start fetching data */
        fetchData()
    }

    override fun onCleared() {
        super.onCleared()
        Log.d(TAG, "onCleared() called")
        stateChangeChannel.close()
    }

    fun connect(): SubscriptionReceiveChannel<UiStateModel> {
        return stateChangeChannel.openSubscription()
    }

    fun fetchData() = async {
        stateChangeChannel.send(UiStateModel.Loading())
        try {
            val state = DataSource.loadData().await()
            stateChangeChannel.send(UiStateModel.from(state))

        } catch (e: Exception) {
            Log.e("MainModel", "Exception happened when sending new state to channel: ${e.cause}")
        }
    }

    internal fun loadStyle(style: DataSource.FetchStyle) {
        DataSource.style = style
        fetchData()
    }
}

这里是 a link to the project on github .

关于使用 RxJava 或 kotlin 协程的 Android ViewState,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48700416/

相关文章:

android - 向/从内部文件写入和读取字符串

android - 创建的 Android 虚拟设备不显示

android - 无法使用 Mockito 测试使用协程和挂起函数的代码

android - Kotlin 启动协程跳过 Google Volley 从服务器检索信息的代码行

java - : java. lang.reflect.InitationTargetException 引起的错误

java - String.split() 不工作

android - 当下一个依赖于前一个时,如何连接多个 RxJava observable?

java - 使用 RXJava2 合并/展平 Android 中的嵌套列表

android - RxJava2 如何在请求参数更改时更新现有订阅

kotlin - 有没有办法使用协程获得FixedTreadPool 行为?