kotlin - 在列表中合并多个Kotlin流,而无需等待第一个值

标签 kotlin kotlin-coroutines kotlin-flow

我有一个List<Flow<T>>,并且想生成一个Flow<List<T>>。这几乎是 combine 做什么 - 除了结合起来,每一个Flow等待发射的初始值,这是不是我想要的。以下面的代码为例:

val a = flow {
  repeat(3) {
    emit("a$it")
    delay(100)
  }
}
val b = flow {
  repeat(3) {
    delay(150)
    emit("b$it")
  }
}
val c = flow {
  delay(400)
  emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
  combine(flows) {
    it.toList()
  }.collect { println(it) }
}
使用combine(并按原样),这是输出:
[a2, b1, c]
[a2, b2, c]
而我也对所有中介步骤都感兴趣。这是我从这三个流程中想要的:
[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
现在我有两种解决方法,但他们都不是伟大的...第一个是难看,不与可空类型的工作:
val flows = listOf(a, b, c).map {
  flow {
    emit(null)
    it.collect { emit(it) }
  }
}
runBlocking {
  combine(flows) {
    it.filterNotNull()
  }.collect { println(it) }
}
通过强制所有流发出不相关的第一个值,确实调用了combine转换器,并且让我删除了我所知不是实际值的空值。对此进行迭代,使可读性更强:
sealed class FlowValueHolder {
  object None : FlowValueHolder()
  data class Some<T>(val value: T) : FlowValueHolder()
}
val flows = listOf(a, b, c).map {
  flow {
    emit(FlowValueHolder.None)
    it.collect { emit(FlowValueHolder.Some(it)) }
  }
}
runBlocking {
  combine(flows) {
    it.filterIsInstance(FlowValueHolder.Some::class.java)
      .map { it.value }
  }.collect { println(it) }
}
现在,这个程序可以正常工作,但仍然感觉像是我在做过多的事情。协程库中有我缺少的方法吗?

最佳答案

这个怎么样:

inline fun <reified T> instantCombine(vararg flows: Flow<T>) = channelFlow {
    val array= Array(flows.size) {
        false to (null as T?) // first element stands for "present"
    }

    flows.forEachIndexed { index, flow ->
        launch {
            flow.collect { emittedElement ->
                array[index] = true to emittedElement
                send(array.filter { it.first }.map { it.second })
            }
        }
    }
}

它解决了一些问题:
  • 无需引入新型
  • []不在结果流
  • 从调用站点抽象出空处理(或解决了该问题),所得的Flow本身处理它

  • 因此,您不会注意到任何特定于实现的变通方法,因为您不必在收集过程中对其进行处理:
    runBlocking {
        instantCombine(a, b, c).collect {
            println(it)
        }
    }
    

    输出:

    [a0]
    [a1]
    [a1, b0]
    [a2, b0]
    [a2, b1]
    [a2, b1, c]
    [a2, b2, c]



    Try it out here!

    编辑:更新了答案以处理也发出空值的流。

    *所使用的低级数组是线程安全的。就像您要处理单个变量一样。

    关于kotlin - 在列表中合并多个Kotlin流,而无需等待第一个值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61185082/

    相关文章:

    reflection - 在运行时确定 Kotlin 的版本

    kotlin - 如何在 Kotlin 中调用具有不同上限的方法?

    android - 初始化后如何修改 Kotlin Coroutine Flow 的值?

    android - 如何在 SAM 中调用挂起函数?

    kotlin - 组合 Flow 和非 Flow api 响应 Kotlin

    Android Studio 3.0支持Kotlin : Activity as Context

    java - Kotlin lateinit 对应java

    kotlin - Android-Things:迁移gpiocallback函数以暂停协程函数

    go - 哪个协程(goroutines 和 kotlin coroutines)更快?

    android - 使用流作为 API 调用的返回类型