我有一个flow
( MutableSharedFlow
,如果相关的话)并且我有潜在昂贵的操作,我想异步执行该操作,同时仍然保持顺序。我使用 CompletableFuture
实现了我想要的目标:
private val threadPoolSize = 5
private val threadPool = Executors.newFixedThreadPool(threadPoolSize)
fun process(flow: Flow<String>) = flow
.map { CompletableFuture.supplyAsync({ expensiveHandle(it) }, threadPool) }
.buffer(threadPoolSize)
.map { it.get() } // block current thread
.flowOn(threadPool.asCoroutineDispatcher())
得益于卸载到线程池的组合,固定大小buffer
和线程阻塞 CompletableFuture#get
,这段代码符合我的期望 - 最多 threadPoolSize
事件被并行处理,并按照接收顺序发送到流。
当我替换CompletableFuture#get
时带扩展功能CompletableFuture#await
来自kotlinx.coroutines.future
并使用flow
或async
而不是CompletableFuture#supplyAsync
,消息不再并行处理:
fun process(flow: Flow<String>) = flow
.map {
runBlocking {
future { expensiveHandle(it) } // same behaviour with async {...}
}
}
.buffer(threadPoolSize)
.map { it.await() }
.flowOn(threadPool.asCoroutineDispatcher())
我可以使用协程/挂起函数执行等效代码吗?
最佳答案
async
以及 future
都是 CoroutineScope
的扩展函数。所以,你需要一些
CoroutineScope
来调用它们。
runBlocking
提供了一些 CoroutineScope
,但它是一个阻塞调用,因此它在 suspend
函数中的使用 is prohibited .
您可以使用GlobalScope.async
,但它也是not recommended并且执行将由 Dispatchers.Default
分派(dispatch),而不是由 threadPool.asCoroutineDispatcher()
分派(dispatch),如 CompletableFuture
的原始示例中那样。
coroutineScope
和 withContext
函数将提供 CoroutineScope
,它从外部作用域继承其 coroutineContext
,因此流程处理将被暂停并立即执行 expenseHandle(it)
协程。
您需要使用工厂函数创建 CoroutineScope
,这样协程上下文就不会混合:
fun process(flow: Flow<String>, threadPool: ThreadPoolExecutor): Flow<String> {
val dispatcher = threadPool.asCoroutineDispatcher()
return flow
.map { CoroutineScope(dispatcher).async { expensiveHandle(it) } }
.buffer(threadPool.poolSize)
.map { it.await() }
.flowOn(dispatcher)
}
关于Kotlin 流程顺序异步处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66868578/