rx-java - 如何使用 RxJava 和 Kotlin 进行 groupBy 和收集?

标签 rx-java reactive-programming kotlin rx-kotlin

我有 Observable<Rates>而 Rate 只是一个简单的对象:

Rate(val value:String){}
Rates(val rates: List<Rate>)

我想改变 Observable<Rates>进入 Observable<HashMap<String,Long> .

例如对于费率 Rates(arrayOf(Rate("1"),Rate("2"), Rate("3"),Rate("3"), Rate("2"),Rate("2")))我期待结果:

(1 -> 1)
(2 -> 3)
(3 -> 2)
(4 -> 0)
(5 -> 0)

我开始创造类似的东西:

service.getRates()
        .flatMap {it-> Observable.from(it.rates) }
        .filter { !it.value.isNullOrEmpty() }
        .groupBy {it -> it.value}
        .collect({ HashMap<String,Long>()}, { b, t -> b.put(t.key, t.count???)}

但我被困在这里,我不知道计算所有值?如果没有 5 of 4,我不知道如何添加空值 (0)。有没有办法使用 rx 来做到这一点?

最佳答案

请查看代码中的注释以获得问题的答案。

import rx.Observable

fun main(args: Array<String>) {
    val service = Service()

    // This adds all keys with each key mapped to zero
    val referenceKeyCounts = Observable
        .just("1", "2", "3", "4", "5")
        .map { it to 0 }

    val keyCountsFromService = service.getRates()
        .flatMap { Observable.from(it.rates) }
        .filter { !it.value.isNullOrEmpty() }
        .map { it.value to 1 } // map each occurrence of key to 1

    Observable.concat(referenceKeyCounts, keyCountsFromService)
        .groupBy { it.first }
        .flatMap { group ->  // this converts GroupedObservable to final values
            group.reduce(0, { acc, pair -> acc + pair.second }) // add instead of counting
                .map { group.key to it }
        }
        .subscribe(::println)

}

class Service {
    fun getRates(): Observable<Rates> = Observable.just(Rates(listOf(
        Rate("1"), Rate("2"), Rate("3"), Rate("3"), Rate("2"), Rate("2")
    )))
}

class Rate(val value: String)

class Rates(val rates: List<Rate>)

关于rx-java - 如何使用 RxJava 和 Kotlin 进行 groupBy 和收集?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40335952/

相关文章:

android - Observable.retrywhen 中的异常类型

asynchronous - 在 render 中调用 setState 是不可避免的

android - Kotlin-reflect 和 android gradle 插件 3.1.0

android - w : Detected multiple Kotlin daemon sessions at build/kotlin/sessions

java - Reactor中的doAfterSucces和doOnSuccess有什么区别?

rx-java - 在 RxJava2 中使用 flatMap 还是 zip?

android - RxScala/Java - 为什么我的 progressBar 没有显示?

spring - 如何设计具有外部阻塞 API 调用的响应式(Reactive)微服务?

javascript - rxjs 科目应该在类里面公开吗?

android - 我如何在运行时更改改造 URL