Kotlin 流程顺序异步处理

标签 kotlin asynchronous future kotlin-coroutines kotlin-flow

我有一个flow ( MutableSharedFlow ,如果相关的话)并且我有潜在昂贵的操作,我想异步执行该操作,同时仍然保持顺序。我使用 CompletableFuture 实现了我想要的目标:

private val threadPoolSize = 5
private val threadPool = Executors.newFixedThreadPool(threadPoolSize)

fun process(flow: Flow<String>) = flow
    .map { CompletableFuture.supplyAsync({ expensiveHandle(it) }, threadPool) }
    .buffer(threadPoolSize)
    .map { it.get() } // block current thread
    .flowOn(threadPool.asCoroutineDispatcher())

得益于卸载到线程池的组合,固定大小buffer线程阻塞 CompletableFuture#get ,这段代码符合我的期望 - 最多 threadPoolSize事件被并行处理,并按照接收顺序发送到流。

当我替换CompletableFuture#get时带扩展功能CompletableFuture#await来自kotlinx.coroutines.future并使用flowasync而不是CompletableFuture#supplyAsync ,消息不再并行处理:

fun process(flow: Flow<String>) = flow
    .map { 
        runBlocking {
            future { expensiveHandle(it) } // same behaviour with async {...}
        }
    }
    .buffer(threadPoolSize)
    .map { it.await() }
    .flowOn(threadPool.asCoroutineDispatcher())

我可以使用协程/挂起函数执行等效代码吗?

最佳答案

async 以及 future 都是 CoroutineScope 的扩展函数。所以,你需要一些 CoroutineScope 来调用它们。

runBlocking 提供了一些 CoroutineScope,但它是一个阻塞调用,因此它在 suspend 函数中的使用 is prohibited .

您可以使用GlobalScope.async,但它也是not recommended并且执行将由 Dispatchers.Default 分派(dispatch),而不是由 threadPool.asCoroutineDispatcher() 分派(dispatch),如 CompletableFuture 的原始示例中那样。

coroutineScopewithContext 函数将提供 CoroutineScope,它从外部作用域继承其 coroutineContext,因此流程处理将被暂停并立即执行 expenseHandle(it) 协程。

您需要使用工厂函数创建 CoroutineScope,这样协程上下文就不会混合:

fun process(flow: Flow<String>, threadPool: ThreadPoolExecutor): Flow<String> {
    val dispatcher = threadPool.asCoroutineDispatcher()
    return flow
        .map { CoroutineScope(dispatcher).async { expensiveHandle(it) } }
        .buffer(threadPool.poolSize)
        .map { it.await() }
        .flowOn(dispatcher)
}

关于Kotlin 流程顺序异步处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66868578/

相关文章:

bitwise-operators - Kotlin 等价于按位或赋值 '|=' 是什么?

ScalaTest AsyncFunSuiteLike 多个断言

scala - 为什么当我执行 Await.result 时我的 Scala 异步测试永远不会完成?

java - 为什么异常完成前调用get()会等待异常执行?

C# .NET FTP 缺少依赖项?文件传输协议(protocol)

javascript - Jasmine - 在执行测试之前等待异步库完全加载

从内部使迭代器映射超时的 Scala 惯用方法?

java - Kotlin - 无法解析 JSON

android - 修饰符工厂函数不应标记为@Composable

kotlin - Kotlin 函数中接收者和参数的类型相同