kotlin - Android Kotlin Coroutines : what is the difference between flow, callbackFlow, channelFlow,... 其他流构造函数

标签 kotlin kotlin-coroutines kotlin-flow

我有代码应该使用流将 SharedPreferences 更改为可观察的存储,所以我有这样的代码

internal val onKeyValueChange: Flow<String> = channelFlow {
        val callback = SharedPreferences.OnSharedPreferenceChangeListener { _, key ->
            coroutineScope.launch {
                //send(key)
                offer(key)
            }
        }

        sharedPreferences.registerOnSharedPreferenceChangeListener(callback)

        awaitClose {
            sharedPreferences.unregisterOnSharedPreferenceChangeListener(callback)
        }
    }

或这个
internal val onKeyValueChange: Flow<String> = callbackFlow {
        val callback = SharedPreferences.OnSharedPreferenceChangeListener { _, key ->
            coroutineScope.launch {
                send(key)
                //offer(key)
            }
        }

        sharedPreferences.registerOnSharedPreferenceChangeListener(callback)

        awaitClose {
            sharedPreferences.unregisterOnSharedPreferenceChangeListener(callback)
        }
    }

然后我观察 token 、userId、companyId 的这个偏好,然后登录,但有一点很奇怪,因为我需要构建应用程序三次,比如更改 token 不会导致 tokenFlow 发出任何东西,然后第二次新的 userId 不会导致 userIdFlow 发出任何东西,然后在第三次登录后我可以注销/登录并且它可以工作。注销时,我正在清除 prefs token 、userId、companyId 中的所有 3 个属性存储。

最佳答案

对于 callbackFlow :

您不能使用 emit()如简单 Flow (因为它是一个 suspend 函数)在回调中。因此callbackFlow为您提供一种同步方式来使用 offer()选项。

例子:

fun observeData() = flow {
 myAwesomeInterface.addListener{ result ->
   emit(result) // NOT ALLOWED
 }
}

因此,协程为您提供了 callbackFlow 的选项:
fun observeData() = callbackFlow {
 myAwesomeInterface.addListener{ result ->
   offer(result) // ALLOWED
 }
 awaitClose{ myAwesomeInterface.removeListener() }
}

对于 channelFlow :

它与基本的主要区别Flowdocumentation 中有描述:

A channel with the default buffer size is used. Use the buffer operator on the resulting flow to specify a user-defined value and to control what happens when data is produced faster than consumed, i.e. to control the back-pressure behavior.


offer()仍然代表同样的事情。这只是 suspending 的同步方式(非 emit() 方式)或 send()
我建议你查一下 Romans Elizarov blog更详细的信息,尤其是this邮政。

关于您的代码,对于 callbackFlow你不需要协程启动:
coroutineScope.launch {
                send(key)
                //offer(key)
            }

只需使用 offer()

关于kotlin - Android Kotlin Coroutines : what is the difference between flow, callbackFlow, channelFlow,... 其他流构造函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61865744/

相关文章:

kotlin - kotlin 中的挂起函数/方法是如何工作的?

android - 将 viewModelScope 与 LiveData 一起使用时出现问题

android - 使用 repeatOnLifeCycle 从 UI 中的 Flow 收集

kotlin - 更改 LiveData 的源流

kotlin - Kotlin 中流发出的并行处理值

kotlin - Kotlin Native cinterop def文件:如何消除绝对路径?

android - Kotlin 内部类访问外部类?

kotlin - Kotlin 中 += 和添加到 MutableList 之间的区别

list - 当谓词为 true 时拆分列表

android - viewModelScope 默认为 MainThread 的原因是什么?