有zip压缩两个函数 Flows
.有什么要 zip 三 (或更多)Flows
一起?
如果没有,你能帮我实现它的扩展功能吗?就像是:
flow.zip(flow2, flow3) { a, b, c ->
}
最佳答案
您可以查看 zip
运算符实现并尝试 复制/模拟 它是如何工作的,使其适应您的需求。
测试它并进行您需要的所有更改
fun <T1, T2, T3, R> Flow<T1>.zip(flow2: Flow<T2>, flow3: Flow<T3>, transform: suspend (T1, T2, T3) -> R): Flow<R> = channelFlow {
val first: ReceiveChannel<T1> = produce {
this@zip.collect {
channel.send(it)
}
}
val second: ReceiveChannel<T2> = produce {
flow2.collect {
channel.send(it)
}
}
val third: ReceiveChannel<T3> = produce {
flow3.collect {
channel.send(it)
}
}
(second as SendChannel<*>).invokeOnClose {
if (!first.isClosedForReceive) first.cancel(MyFlowException())
if (!third.isClosedForReceive) third.cancel(MyFlowException())
}
(third as SendChannel<*>).invokeOnClose {
if (!first.isClosedForReceive) first.cancel(MyFlowException())
if (!second.isClosedForReceive) second.cancel(MyFlowException())
}
val otherIterator = second.iterator()
val anotherIterator = third.iterator()
try {
first.consumeEach { value ->
if (!otherIterator.hasNext() || !anotherIterator.hasNext()) {
return@consumeEach
}
send(transform(value, otherIterator.next(), anotherIterator.next()))
}
} catch (e: MyFlowException) {
// complete
} finally {
if (!second.isClosedForReceive) second.cancel(MyFlowException())
if (!third.isClosedForReceive) third.cancel(MyFlowException())
}
}
class MyFlowException: CancellationException()
用法:
flow1.zip(flow2, flow3) { a, b, c ->
//Do your work
}...
关于Kotlin 协程压缩三个流程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60930642/