我正在研究 Kotlin Flow 在我当前的 Android 应用程序中的使用
我的应用程序通过 Retrofit API 调用从远程服务器检索数据。
其中一些 API 在 500 个项目页面中返回 50,000 个数据项目。
每个 API 响应都包含一个 HTTP 链接 header ,其中包含下一页完整 URL。
这些调用最多可能需要 2 秒才能完成。
为了减少运行时间,我使用了 Kotlin Flow 来同时处理每个页面 数据,同时还进行下一页 API 调用。
我的流程定义如下:
private val persistenceThreadPool = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
private val internalWorkWorkState = MutableStateFlow<Response<List<MyPage>>?>(null)
private val workWorkState = internalWorkWorkState.asStateFlow()
private val myJob: Job
init {
myJob = GlobalScope.launch(persistenceThreadPool) {
workWorkState.collect { page ->
if (page == null) {
} else managePage(page!!)
}
}
}
我的递归函数定义如下,用于获取所有页面:-
private suspend fun managePages(accessToken: String, response: Response<List<MyPage>>) {
when {
result != null -> return
response.isSuccessful -> internalWorkWorkState.emit(response)
else -> {
manageError(response.errorBody())
result = Result.failure()
return
}
}
response.headers().filter { it.first == HTTP_HEADER_LINK && it.second.contains(REL_NEXT) }.forEach {
val parts = it.second.split(OPEN_ANGLE, CLOSE_ANGLE)
if (parts.size >= 2) {
managePages(accessToken, service.myApiCall(accessToken, parts[1]))
}
}
}
private suspend fun managePage(response: Response<List<MyPage>>) {
val pages = response.body()
pages?.let {
persistResponse(it)
}
}
private suspend fun persistResponse(myPage: List<MyPage>) {
val myPageDOs = ArrayList<MyPageDO>()
myPage.forEach { page ->
myPageDOs.add(page.mapDO())
}
database.myPageDAO().insertAsync(myPageDOs)
}
我的众多问题是
此代码不会插入我检索到的所有数据项
检索完所有数据项后如何完成流程
检索并保存所有数据项后,如何完成 GlobalScope 作业
更新
通过进行以下更改,我已成功插入所有数据
private val persistenceThreadPool = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
private val completed = CompletableDeferred<Int>()
private val channel = Channel<Response<List<MyPage>>?>(UNLIMITED)
private val channelFlow = channel.consumeAsFlow().flowOn(persistenceThreadPool)
private val frank: Job
init {
frank = GlobalScope.launch(persistenceThreadPool) {
channelFlow.collect { page ->
if (page == null) {
completed.complete(totalItems)
} else managePage(page!!)
}
}
}
...
...
...
channel.send(null)
completed.await()
return result ?: Result.success(outputData)
我不喜欢依赖 CompletableDeferred
,有没有比这更好的方法来知道 Flow 何时完成所有操作?
最佳答案
您正在寻找flow builder和 Flow.buffer() :
suspend fun getData(): Flow<Data> = flow {
var pageData: List<Data>
var pageUrl: String? = "bla"
while (pageUrl != null) {
TODO("fetch pageData from pageUrl and change pageUrl to the next page")
emitAll(pageData)
}
}
.flowOn(Dispatchers.IO /* no need for a thread pool executor, IO does it automatically */)
.buffer(3)
您可以像普通 Flow、迭代等一样使用它。如果您想知道输出的总长度,您应该使用可变闭包变量在使用者上计算它。请注意,您不需要在任何地方使用 GlobalScope(最好是永远)。
关于android - 如何在 Android Worker 中完成 Kotlin Flow,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65358379/