android - 如何在 Android Worker 中完成 Kotlin Flow

标签 android kotlin-flow

我正在研究 Kotlin Flow 在我当前的 Android 应用程序中的使用

我的应用程序通过 Retrofit API 调用从远程服务器检索数据。

其中一些 API 在 500 个项目页面中返回 50,000 个数据项目。

每个 API 响应都包含一个 HTTP 链接 header ,其中包含下一页完整 URL。

这些调用最多可能需要 2 秒才能完成。

为了减少运行时间,我使用了 Kotlin Flow 来同时处理每个页面 数据,同时还进行下一页 API 调用。

我的流程定义如下:

private val persistenceThreadPool = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
private val internalWorkWorkState = MutableStateFlow<Response<List<MyPage>>?>(null)
private val workWorkState = internalWorkWorkState.asStateFlow()

private val myJob: Job

init {
    myJob = GlobalScope.launch(persistenceThreadPool) {
        workWorkState.collect { page ->
            if (page == null) {
            } else managePage(page!!)
        }
    }
}

我的递归函数定义如下,用于获取所有页面:-

    private suspend fun managePages(accessToken: String, response: Response<List<MyPage>>) {
        when {
            result != null -> return
            response.isSuccessful -> internalWorkWorkState.emit(response)
            else -> {
                manageError(response.errorBody())
                result = Result.failure()
                return
            }
        }

        response.headers().filter { it.first == HTTP_HEADER_LINK && it.second.contains(REL_NEXT) }.forEach {
            val parts = it.second.split(OPEN_ANGLE, CLOSE_ANGLE)
            if (parts.size >= 2) {
                managePages(accessToken, service.myApiCall(accessToken, parts[1]))
            }
        }
    }

   private suspend fun managePage(response: Response<List<MyPage>>) {
        val pages = response.body()

        pages?.let {
            persistResponse(it)
        }
    }

    private suspend fun persistResponse(myPage: List<MyPage>) {
        val myPageDOs = ArrayList<MyPageDO>()

        myPage.forEach { page ->
            myPageDOs.add(page.mapDO())
        }

        database.myPageDAO().insertAsync(myPageDOs)
    }
    

我的众多问题是

  1. 此代码不会插入我检索到的所有数据项

  2. 检索完所有数据项后如何完成流程

  3. 检索并保存所有数据项后,如何完成 GlobalScope 作业

更新

通过进行以下更改,我已成功插入所有数据

 private val persistenceThreadPool = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
    private val completed = CompletableDeferred<Int>()

    private val channel = Channel<Response<List<MyPage>>?>(UNLIMITED)
    private val channelFlow = channel.consumeAsFlow().flowOn(persistenceThreadPool)

    private val frank: Job

    init {
        frank = GlobalScope.launch(persistenceThreadPool) {
            channelFlow.collect { page ->
                if (page == null) {
                    completed.complete(totalItems)
                } else managePage(page!!)
            }
        }
    }


...
...
...

   channel.send(null)
   completed.await()

   return result ?: Result.success(outputData)

我不喜欢依赖 CompletableDeferred,有没有比这更好的方法来知道 Flow 何时完成所有操作?

最佳答案

您正在寻找flow builderFlow.buffer() :

suspend fun getData(): Flow<Data> = flow {
    var pageData: List<Data>
    var pageUrl: String? = "bla"
    while (pageUrl != null) {
        TODO("fetch pageData from pageUrl and change pageUrl to the next page")
        emitAll(pageData)
    }
}
    .flowOn(Dispatchers.IO /* no need for a thread pool executor, IO does it automatically */)
    .buffer(3)

您可以像普通 Flow、迭代等一样使用它。如果您想知道输出的总长度,您应该使用可变闭包变量在使用者上计算它。请注意,您不需要在任何地方使用 GlobalScope(最好是永远)。

关于android - 如何在 Android Worker 中完成 Kotlin Flow,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65358379/

相关文章:

android - 我必须使用哪个功能才能使我的应用程序仅适用于包含 Orientation Sensor 的设备?

android-jetpack-compose - Ui 未从 viewmodel kotlin flow 更新

retrofit - Kotlin Flow - Retrofit 与 Result 密封类的通用函数

kotlin - Jetpack 撰写 : Notify changes in StateFlow

Kotlin 协程流程 : How to get the first item from a flow (i.文件元数据)并将其余项目作为内容流传递?

Android SDK 居中对齐两个元素

android - 在 Android 中以管理员身份禁用应用程序之前需要密码

android - java.lang.ClassCastException : android. view.ViewGroup$LayoutParams 无法转换为 android.widget.Gallery$LayoutParams

firebase - 为什么 trySend 会发出假数据?

android - Kotlin 函数返回值被保存在 firebase 中