android - 从消费者发送消息到回调流

标签 android kotlin kotlin-coroutines kotlin-flow

我不知道我想要的是否完全可能,但首先让我分享代码。 下面的代码只是创建一个回调流(生产者)并将其发送到 View 模型(消费者)。

资源

sealed class Resource<out T:Any>{
    data class Success<out T:Any> (val data:T):Resource<T>()
    data class Error(val exception: Exception):Resource<Nothing>()
    data class Loading(val message:String):Resource<Nothing>()
}

存储库:

class MyRepository() {

    companion object {
        const val TAG = "___BBBMyRepository"
    }

    var globalJob: Job? = null
    private fun performSomething(result: ((Resource<Int>) -> Unit)) {
        globalJob = GlobalScope.launch {
            result(Resource.Loading("Loading-1"))
            delay(2500)
            result(Resource.Loading("Loading-2"))
            delay(2500)
            result(Resource.Loading("Loading-3"))
            delay(2500)
            result(Resource.Error(Exception("Try again...")))
        }
    }

    fun startPerform(): Flow<Resource<Int>> = callbackFlow {
        performSomething{ result ->
            Log.d(TAG, "performSomething $result")
            trySend(result)
        }
        awaitClose {
            globalJob?.cancel()
            Log.d(TAG, "startPerform awaitClose")
        }
    }
}

我的 View 模型:

@HiltViewModel
class BViewModel @Inject constructor(
    private val myRepository: MyRepository
): ViewModel() {

    companion object {
        const val TAG = "___BBBViewModel"
    }

    init {
        Log.i(TAG, "Initialized")
        myRepository.startPerform()
            .onEach {
                Log.i(TAG, it.toString())
            }.onCompletion {
                Log.i(TAG, "OnCompletion")
            }.launchIn(viewModelScope)
    }

    override fun onCleared() {
        super.onCleared()
        Log.i(TAG, "OnCleared")
    }
}

如果我在流程完成之前按后退键返回,则会触发 awaitClose{} 并且流程将成功终止。

但是当出现错误情况时,我想根据 View 模型的请求进行再现过程。

所以我需要以某种方式从 viewmodel 向 startPerform 函数发送请求,就像在 waitclose 中一样。

我想编写如下代码。这可能吗?

    fun startPerform(): Flow<Resource<Int>> = callbackFlow {
        performSomething{ result ->
            Log.d(TAG, "performSomething $result")
            trySend(result)
        }
        
        restartFlow { 
            
        }
        
        awaitClose {
            globalJob?.cancel()
            Log.d(TAG, "startPerform awaitClose")
        }
    }
    init {
        Log.i(TAG, "Initialized")
        myRepository.startPerform()
            .onEach {
                Log.i(TAG, it.toString())
if ( it is Resource.Error ) {//-----------------
    restartFlow //-----------------
}//-----------------
            }.onCompletion {
                Log.i(TAG, "OnCompletion")
            }.launchIn(viewModelScope)
    }

编辑_1

如果不可能像上面那样写。那么看来唯一的解决方案是使用如下所示的接口(interface)编写。下面的代码有什么需要改进或者需要注意的地方吗?

interface MyFlowListener {
    fun start()
    fun completion()
}

    var startPerformListener: MyFlowListener? = null
    fun startPerform(): Flow<Resource<Int>> = callbackFlow {
        startPerformListener = object : MyFlowListener {
            override fun start() {
                globalJob?.cancel()
                performSomething{ result ->
                    Log.d(TAG, "performSomething $result")
                    trySend(result)
                }
            }

            override fun completion() {
                globalJob?.cancel()
                channel.close()
            }
        }
        performSomething{ result ->
            Log.d(TAG, "performSomething $result")
            trySend(result)
        }


        awaitClose {
            globalJob?.cancel()
            Log.d(TAG, "startPerform awaitClose")
        }
    }
    init {
        Log.i(TAG, "Initialized")
        myRepository.startPerform()
            .onEach {
                Log.i(TAG, it.toString())
                when ( it ) {
                    is Resource.Error -> {
                        myRepository.startPerformListener?.start()
                    }
                    is Resource.Loading -> {}
                    is Resource.Success -> {
                        myRepository.startPerformListener?.completion()
                    }
                }
            }.onCompletion {
                Log.i(TAG, "OnCompletion")
            }.launchIn(viewModelScope)
    }

编辑_2

编辑:

我的队列机制:


@SuppressLint("MissingPermission")
class BleDataSource  @Inject constructor(
    private val handler: Handler
) {

    private val operationQueue = ConcurrentLinkedQueue<BleOperationType>()
    private val operationLock = ReentrantLock()
    private var pendingOperation: BleOperationType? = null

    fun performConnect(device: BluetoothDevice, result: ((Resource<BleOperationResult>) -> Unit)) {
        enqueueOperation(Connect(device, result))
    }

    @Synchronized
    private fun enqueueOperation(operation: BleOperationType) {
        handler.post {
            operationQueue.add(operation)
            if ( !operationLock.isLocked ) {
                doNextOperation()
            }
        }
    }


    @Synchronized
    private fun signalEndOfOperation() {
        handler.post {
            pendingOperation = null
            operationLock.unlock()
            if ( operationQueue.isNotEmpty() ) {
                doNextOperation()
            }
        }
    }


    @Synchronized
    private fun doNextOperation() {
        if ( operationLock.isLocked ) {
            Timber.i("doNextOperation already locked, returning...")
            return
        }

        val operation = operationQueue.poll() ?: run {
            Timber.v("Operation queue empty, returning...")
            return
        }
        operationLock.lock()
        pendingOperation = operation


        if ( operation is Connect ) {
            with(operation) {
                operation.result(Resource.Loading(message = "Connecting to ${device.name}"))
                bluetoothGatt = if ( Build.VERSION.SDK_INT < Build.VERSION_CODES.M ) {
                    device.connectGatt(context, false, gattCallback)
                } else {
                    device.connectGatt(context, false, gattCallback, BluetoothDevice.TRANSPORT_LE)
                }
            }
        }

    }


        override fun onConnectionStateChange(gatt: BluetoothGatt, status: Int, newState: Int) {
            val deviceAddress = gatt.device.address
            val operation = pendingOperation
            var res: Resource<BleOperationResult> = Resource.Error(errorMessage = "Unknown Error!")

            if ( status == BluetoothGatt.GATT_SUCCESS ) {
                if ( newState == BluetoothProfile.STATE_CONNECTED ) {
                    res = Resource.Loading(message = "Discovering Services")
                    gatt.discoverServices()
                } else if ( newState == BluetoothProfile.STATE_DISCONNECTED ) {
                    res = Resource.Error(errorMessage = "Unexpected Disconnected")
                }
            } else {
                res = Resource.Error(errorMessage = "Error:$status encountered fro:$deviceAddress!")
            }

            if ( operation is Connect ) {
                operation.result(res)
            }
            if ( res is Resource.Error ) {
                if ( operation is Connect  ) {
                signalEndOfOperation()
                }
            }
        }


        override fun onServicesDiscovered(gatt: BluetoothGatt?, status: Int) {
            val operation = pendingOperation
            var res: Resource<BleOperationResult> = Resource.Error(errorMessage = "Unknown Error!")

            if ( status == BluetoothGatt.GATT_SUCCESS ) {
              res = Resource.Success(data = BleOperationResult.ConnectionResult(profile))
                }
            } else {
                res = Resource.Error(errorMessage = "Failed to discover services...")
            }

            if ( operation is Connect ) {
                operation.result(res)
            }
            if ( pendingOperation is Connect ) {
                signalEndOfOperation()
            }
        }

abstract class BleOperationType {
    abstract val result: ((Resource<BleOperationResult>) -> Unit)
}
data class Connect(val device: BluetoothDevice,
                   override val result: ((Resource<BleOperationResult>) -> Unit)) : BleOperationType()

最佳答案

您不会在callbackFlow block 内定义重试行为。您可以使用现有的retry流运算符。例如:

init {
    Log.i(TAG, "Initialized")

    class ResourceError: Exception()

    myRepository.startPerform()
        .onEach {
            Log.i(TAG, it.toString())
            if (it is Resource.Error) {
                throw ResourceError()
            }
            //...
        }
        .retry { it is ResourceError }
        .onCompletion {
            Log.i(TAG, "OnCompletion")
        }.launchIn(viewModelScope)
}

您的回调流程实现存在缺陷。它一次只能处理一个 globalJob,因此如果一次收集多个流,其中一些将被泄漏。如果并行收集时取消了任何一项,则最新收集的一项可能会提前取消。

我不确定你是否只是想使用这个全局协程来模拟一个实际的基于回调的 API,因为它真的很 hacky。如果您自己设计后端,则首先没有理由使用callbackFlow。例如:

class MyRepository() {

    companion object {
        const val TAG = "___BBBMyRepository"
    }

    fun startPerform(): Flow<Resource<Int>> = flow {
        emit(Resource.Loading("Loading-1"))
        delay(2500)
        emit(Resource.Loading("Loading-2"))
        delay(2500)
        emit(Resource.Loading("Loading-3"))
        delay(2500)
        emit(Resource.Error(Exception("Try again...")))
    }
}

如果您尝试使用协程临时模拟基于回调的 API,以测试您对 callbackFlow 的使用,那么您需要对其进行修改,以便它可以同时处理多个“监听器”。也许是这样的:

class MyRepository() {

    companion object {
        const val TAG = "___BBBMyRepository"
    }

    private fun performSomething(result: ((Resource<Int>) -> Unit)): Job {
        return GlobalScope.launch {
            result(Resource.Loading("Loading-1"))
            delay(2500)
            result(Resource.Loading("Loading-2"))
            delay(2500)
            result(Resource.Loading("Loading-3"))
            delay(2500)
            result(Resource.Error(Exception("Try again...")))
        }
    }

    fun startPerform(): Flow<Resource<Int>> = callbackFlow {
        val job = performSomething { result ->
            Log.d(TAG, "performSomething $result")
            trySend(result)
        }
        awaitClose {
            job.cancel()
            Log.d(TAG, "startPerform awaitClose")
        }
    }
}

关于android - 从消费者发送消息到回调流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76070426/

相关文章:

kotlin - 在 Kotlin 中测试协程

java - 使用启动画面启动应用程序,但是当我运行它时

java - Android教程问题

kotlin - 只有 Kotlin 协程的数据流?

kotlin - 我可以用Kotlin将一行also语法转换为两行吗?

kotlin - 合并 map 列表(字符串列表)

Android:从未调用适配器中的getView

android - 如何将日期添加到当前日期?

android - NoClassDefFoundError : Failed resolution of: Lkotlinx/coroutines/Dispatchers

android - Coroutine Kotlin Android with Retrofit