我有一个List<Flow<T>>
,并且想生成一个Flow<List<T>>
。这几乎是 combine
做什么 - 除了结合起来,每一个Flow
等待发射的初始值,这是不是我想要的。以下面的代码为例:
val a = flow {
repeat(3) {
emit("a$it")
delay(100)
}
}
val b = flow {
repeat(3) {
delay(150)
emit("b$it")
}
}
val c = flow {
delay(400)
emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
combine(flows) {
it.toList()
}.collect { println(it) }
}
使用combine
(并按原样),这是输出:[a2, b1, c]
[a2, b2, c]
而我也对所有中介步骤都感兴趣。这是我从这三个流程中想要的:[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
现在我有两种解决方法,但他们都不是伟大的...第一个是难看,不与可空类型的工作:val flows = listOf(a, b, c).map {
flow {
emit(null)
it.collect { emit(it) }
}
}
runBlocking {
combine(flows) {
it.filterNotNull()
}.collect { println(it) }
}
通过强制所有流发出不相关的第一个值,确实调用了combine
转换器,并且让我删除了我所知不是实际值的空值。对此进行迭代,使可读性更强:sealed class FlowValueHolder {
object None : FlowValueHolder()
data class Some<T>(val value: T) : FlowValueHolder()
}
val flows = listOf(a, b, c).map {
flow {
emit(FlowValueHolder.None)
it.collect { emit(FlowValueHolder.Some(it)) }
}
}
runBlocking {
combine(flows) {
it.filterIsInstance(FlowValueHolder.Some::class.java)
.map { it.value }
}.collect { println(it) }
}
现在,这个程序可以正常工作,但仍然感觉像是我在做过多的事情。协程库中有我缺少的方法吗?
最佳答案
这个怎么样:
inline fun <reified T> instantCombine(vararg flows: Flow<T>) = channelFlow {
val array= Array(flows.size) {
false to (null as T?) // first element stands for "present"
}
flows.forEachIndexed { index, flow ->
launch {
flow.collect { emittedElement ->
array[index] = true to emittedElement
send(array.filter { it.first }.map { it.second })
}
}
}
}
它解决了一些问题:
[]
不在结果流因此,您不会注意到任何特定于实现的变通方法,因为您不必在收集过程中对其进行处理:
runBlocking {
instantCombine(a, b, c).collect {
println(it)
}
}
输出:
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
Try it out here!
编辑:更新了答案以处理也发出空值的流。
*所使用的低级数组是线程安全的。就像您要处理单个变量一样。
关于kotlin - 在列表中合并多个Kotlin流,而无需等待第一个值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61185082/