android - 使用协同程序读取和复制文件

标签 android kotlin asynchronous kotlin-coroutines fileinputstream

我创建了以下应用程序来说明一些疑问。 My Example on the Github
在此示例中,我将文件复制到另一个包。
我的疑问如下:

  • 并行执行任务,是否可以返回取消之前已完成的值?
  • 在使用IO上下文时,为什么在contentResolver.openInputStream (uri)中出现消息“不适当的阻塞方法调用”?
  • 在读取文件条目以复制到输出时,我总是检查作业状态,以便在取消此任务时立即停止它,删除创建的输出文件并返回取消异常,对吗?
  • 我可以界定执行的任务数量吗?

  • 我的onCreate:
    private val listUri = mutableListOf<Uri>()
    private val job = Job()
    
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        
        //get files from 1 to 40
        val packageName = "android.resource://${packageName}/raw/"
        for (i in 1..40) {
            listUri.add(Uri.parse("${packageName}file$i"))
        }
    }
    
    我的按钮 Action :
      //Button action
       fun onClickStartTask(view: View) {
            var listNewPath = emptyList<String>()
            CoroutineScope(Main + job).launch {
                try {
                    //shows something in the UI - progressBar
                    withContext(IO) {
                        listNewPath = listUri.map { uri ->
                            async {
                                //path to file temp
                                val pathFileTemp =
                                    "${getExternalFilesDir("Temp").toString()}/${uri.lastPathSegment}"
                                val file = File(pathFileTemp)
                                val inputStream = contentResolver.openInputStream(uri)
                                inputStream?.use { input ->
                                    FileOutputStream(file).use { output ->
                                        val buffer = ByteArray(1024)
                                        var read: Int = input.read(buffer)
                                        while (read != -1) {
                                            if (isActive) {
                                                output.write(buffer, 0, read)
                                                read = input.read(buffer)
                                            } else {
                                                input.close()
                                                output.close()
                                                file.deleteRecursively()
                                                throw CancellationException()
                                            }
                                        }
                                    }
                                }
                                //If completed then it returns the new path.
                                return@async pathFileTemp
                            }
                        }.awaitAll()
                    }
                } finally {
                    //shows list complete in the UI
                }
            }
        }
    
    我的取消工作的按钮:
    fun onClickCancelTask(view: View) {
        if (job.isActive) {
            job.cancelChildren()
            println("Cancel children")
        }
    }
    
    这将是执行任务的按钮 Action 。
    我感谢所有帮助。

    最佳答案

    回答1.和4 .:
    为了划定并行任务并让它们独立完成(获取一些值,同时取消其余任务),您将需要使用Channel,最好使用Flow。简化示例:

    fun processListWithSomeWorkers(list: List<Whatever>, concurrency: Int): Flow<Result> = channelFlow {
       val workToDistribute = Channel<Whatever>()
       launch { for(item in list) workToDistribute.send(item) } // one coroutine distributes work...
    
        repeat(concurrency) { // launch a specified number of worker coroutines
          launch { 
             for (task in workToDistribute) { // which process tasks in a loop
                val atomicResult = process(task)
                send(atomicResult) // and send results downstream to a Flow
             }
          }
       }
    }
    
    然后,您可以逐一处理结果,因为它们正在生成,等待整个流程完成,例如只需在需要时服用其中一些:resultFlow.take(20).onEach { ... }.collectIn(someScope)因为它是Flow,所以只有在有人开始收集(很冷)时,它才会开始工作,这通常是一件好事。
    整个过程可能会更短一些,因为您会发现一些更具体和实验性的功能(作为产品)。可以将其概括为Flow运算符,如下所示:
    fun <T, R> Flow<T>.concurrentMap(concurrency: Int, transform: suspend (T) -> R): Flow<R> {
        require(concurrency > 1) { "No sense with concurrency < 2" }
        return channelFlow {
            val inputChannel = produceIn(this)
            repeat(concurrency) {
                launch {
                    for (input in inputChannel) send(transform(input))
                }
            }
        }
    }
    
    并使用:list.asFlow().concurrentMap(concurrency = 4) { <your mapping logic> }corotuines团队正在考虑向Flow流添加一系列并行运算符,但它们还没有AFAIK。

    关于android - 使用协同程序读取和复制文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62486876/

    相关文章:

    android - 如何在 Flutter 中选择项目后更改卡片颜色?

    c++ - 我可以精确地时间重叠 ReadFileEx 操作吗?

    android - 您可以在新版本的 android 上运行旧应用程序吗?

    android - 从 ListView( fragment 、BaseAdapter)中单击图像后删除项目

    java - 我无法向 android studio 中的 color.xml 文件添加颜色

    kotlin - 无法解析: androidx. compose.ui:ui-tooling和其他模块

    kotlin - 为什么 Kotlin '===' 引用相等运算符对相同的对象引用返回 false?

    c# - sql异步查询问题

    javascript - 在 angularjs 中使用 $q 在数组上循环异步函数

    android - 如何在线保存/加载数据(使用 AJAX 和 JSON 存储数据)和离线(本地)