我不知道我想要的是否完全可能,但首先让我分享代码。 下面的代码只是创建一个回调流(生产者)并将其发送到 View 模型(消费者)。
资源
sealed class Resource<out T:Any>{
data class Success<out T:Any> (val data:T):Resource<T>()
data class Error(val exception: Exception):Resource<Nothing>()
data class Loading(val message:String):Resource<Nothing>()
}
存储库:
class MyRepository() {
companion object {
const val TAG = "___BBBMyRepository"
}
var globalJob: Job? = null
private fun performSomething(result: ((Resource<Int>) -> Unit)) {
globalJob = GlobalScope.launch {
result(Resource.Loading("Loading-1"))
delay(2500)
result(Resource.Loading("Loading-2"))
delay(2500)
result(Resource.Loading("Loading-3"))
delay(2500)
result(Resource.Error(Exception("Try again...")))
}
}
fun startPerform(): Flow<Resource<Int>> = callbackFlow {
performSomething{ result ->
Log.d(TAG, "performSomething $result")
trySend(result)
}
awaitClose {
globalJob?.cancel()
Log.d(TAG, "startPerform awaitClose")
}
}
}
我的 View 模型:
@HiltViewModel
class BViewModel @Inject constructor(
private val myRepository: MyRepository
): ViewModel() {
companion object {
const val TAG = "___BBBViewModel"
}
init {
Log.i(TAG, "Initialized")
myRepository.startPerform()
.onEach {
Log.i(TAG, it.toString())
}.onCompletion {
Log.i(TAG, "OnCompletion")
}.launchIn(viewModelScope)
}
override fun onCleared() {
super.onCleared()
Log.i(TAG, "OnCleared")
}
}
如果我在流程完成之前按后退键返回,则会触发 awaitClose{}
并且流程将成功终止。
但是当出现错误情况时,我想根据 View 模型的请求进行再现过程。
所以我需要以某种方式从 viewmodel 向 startPerform 函数发送请求,就像在 waitclose 中一样。
我想编写如下代码。这可能吗?
fun startPerform(): Flow<Resource<Int>> = callbackFlow {
performSomething{ result ->
Log.d(TAG, "performSomething $result")
trySend(result)
}
restartFlow {
}
awaitClose {
globalJob?.cancel()
Log.d(TAG, "startPerform awaitClose")
}
}
init {
Log.i(TAG, "Initialized")
myRepository.startPerform()
.onEach {
Log.i(TAG, it.toString())
if ( it is Resource.Error ) {//-----------------
restartFlow //-----------------
}//-----------------
}.onCompletion {
Log.i(TAG, "OnCompletion")
}.launchIn(viewModelScope)
}
编辑_1
如果不可能像上面那样写。那么看来唯一的解决方案是使用如下所示的接口(interface)编写。下面的代码有什么需要改进或者需要注意的地方吗?
interface MyFlowListener {
fun start()
fun completion()
}
var startPerformListener: MyFlowListener? = null
fun startPerform(): Flow<Resource<Int>> = callbackFlow {
startPerformListener = object : MyFlowListener {
override fun start() {
globalJob?.cancel()
performSomething{ result ->
Log.d(TAG, "performSomething $result")
trySend(result)
}
}
override fun completion() {
globalJob?.cancel()
channel.close()
}
}
performSomething{ result ->
Log.d(TAG, "performSomething $result")
trySend(result)
}
awaitClose {
globalJob?.cancel()
Log.d(TAG, "startPerform awaitClose")
}
}
init {
Log.i(TAG, "Initialized")
myRepository.startPerform()
.onEach {
Log.i(TAG, it.toString())
when ( it ) {
is Resource.Error -> {
myRepository.startPerformListener?.start()
}
is Resource.Loading -> {}
is Resource.Success -> {
myRepository.startPerformListener?.completion()
}
}
}.onCompletion {
Log.i(TAG, "OnCompletion")
}.launchIn(viewModelScope)
}
编辑_2
编辑:
我的队列机制:
@SuppressLint("MissingPermission")
class BleDataSource @Inject constructor(
private val handler: Handler
) {
private val operationQueue = ConcurrentLinkedQueue<BleOperationType>()
private val operationLock = ReentrantLock()
private var pendingOperation: BleOperationType? = null
fun performConnect(device: BluetoothDevice, result: ((Resource<BleOperationResult>) -> Unit)) {
enqueueOperation(Connect(device, result))
}
@Synchronized
private fun enqueueOperation(operation: BleOperationType) {
handler.post {
operationQueue.add(operation)
if ( !operationLock.isLocked ) {
doNextOperation()
}
}
}
@Synchronized
private fun signalEndOfOperation() {
handler.post {
pendingOperation = null
operationLock.unlock()
if ( operationQueue.isNotEmpty() ) {
doNextOperation()
}
}
}
@Synchronized
private fun doNextOperation() {
if ( operationLock.isLocked ) {
Timber.i("doNextOperation already locked, returning...")
return
}
val operation = operationQueue.poll() ?: run {
Timber.v("Operation queue empty, returning...")
return
}
operationLock.lock()
pendingOperation = operation
if ( operation is Connect ) {
with(operation) {
operation.result(Resource.Loading(message = "Connecting to ${device.name}"))
bluetoothGatt = if ( Build.VERSION.SDK_INT < Build.VERSION_CODES.M ) {
device.connectGatt(context, false, gattCallback)
} else {
device.connectGatt(context, false, gattCallback, BluetoothDevice.TRANSPORT_LE)
}
}
}
}
override fun onConnectionStateChange(gatt: BluetoothGatt, status: Int, newState: Int) {
val deviceAddress = gatt.device.address
val operation = pendingOperation
var res: Resource<BleOperationResult> = Resource.Error(errorMessage = "Unknown Error!")
if ( status == BluetoothGatt.GATT_SUCCESS ) {
if ( newState == BluetoothProfile.STATE_CONNECTED ) {
res = Resource.Loading(message = "Discovering Services")
gatt.discoverServices()
} else if ( newState == BluetoothProfile.STATE_DISCONNECTED ) {
res = Resource.Error(errorMessage = "Unexpected Disconnected")
}
} else {
res = Resource.Error(errorMessage = "Error:$status encountered fro:$deviceAddress!")
}
if ( operation is Connect ) {
operation.result(res)
}
if ( res is Resource.Error ) {
if ( operation is Connect ) {
signalEndOfOperation()
}
}
}
override fun onServicesDiscovered(gatt: BluetoothGatt?, status: Int) {
val operation = pendingOperation
var res: Resource<BleOperationResult> = Resource.Error(errorMessage = "Unknown Error!")
if ( status == BluetoothGatt.GATT_SUCCESS ) {
res = Resource.Success(data = BleOperationResult.ConnectionResult(profile))
}
} else {
res = Resource.Error(errorMessage = "Failed to discover services...")
}
if ( operation is Connect ) {
operation.result(res)
}
if ( pendingOperation is Connect ) {
signalEndOfOperation()
}
}
abstract class BleOperationType {
abstract val result: ((Resource<BleOperationResult>) -> Unit)
}
data class Connect(val device: BluetoothDevice,
override val result: ((Resource<BleOperationResult>) -> Unit)) : BleOperationType()
最佳答案
您不会在callbackFlow
block 内定义重试行为。您可以使用现有的retry
流运算符。例如:
init {
Log.i(TAG, "Initialized")
class ResourceError: Exception()
myRepository.startPerform()
.onEach {
Log.i(TAG, it.toString())
if (it is Resource.Error) {
throw ResourceError()
}
//...
}
.retry { it is ResourceError }
.onCompletion {
Log.i(TAG, "OnCompletion")
}.launchIn(viewModelScope)
}
您的回调流程实现存在缺陷。它一次只能处理一个 globalJob
,因此如果一次收集多个流,其中一些将被泄漏。如果并行收集时取消了任何一项,则最新收集的一项可能会提前取消。
我不确定你是否只是想使用这个全局协程来模拟一个实际的基于回调的 API,因为它真的很 hacky。如果您自己设计后端,则首先没有理由使用callbackFlow
。例如:
class MyRepository() {
companion object {
const val TAG = "___BBBMyRepository"
}
fun startPerform(): Flow<Resource<Int>> = flow {
emit(Resource.Loading("Loading-1"))
delay(2500)
emit(Resource.Loading("Loading-2"))
delay(2500)
emit(Resource.Loading("Loading-3"))
delay(2500)
emit(Resource.Error(Exception("Try again...")))
}
}
如果您尝试使用协程临时模拟基于回调的 API,以测试您对 callbackFlow
的使用,那么您需要对其进行修改,以便它可以同时处理多个“监听器”。也许是这样的:
class MyRepository() {
companion object {
const val TAG = "___BBBMyRepository"
}
private fun performSomething(result: ((Resource<Int>) -> Unit)): Job {
return GlobalScope.launch {
result(Resource.Loading("Loading-1"))
delay(2500)
result(Resource.Loading("Loading-2"))
delay(2500)
result(Resource.Loading("Loading-3"))
delay(2500)
result(Resource.Error(Exception("Try again...")))
}
}
fun startPerform(): Flow<Resource<Int>> = callbackFlow {
val job = performSomething { result ->
Log.d(TAG, "performSomething $result")
trySend(result)
}
awaitClose {
job.cancel()
Log.d(TAG, "startPerform awaitClose")
}
}
}
关于android - 从消费者发送消息到回调流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76070426/