kotlin - 使用 RxJava 构建 "task queue"的最佳方式

标签 kotlin queue rx-java reactive-programming rx-java2

目前我正在研究许多与网络相关的功能。目前,我正在处理一个允许我一次发送一条信息的网络 channel ,我必须等待它被确认才能发送下一条信息。我用 1..n 代表服务器连接的客户端。

其中一些消息,我必须以 block 的形式发送,这使用 RxJava 很容易做到。目前我的“写作”方法看起来有点像这样:

fun write(bytes: ByteArray, ignoreMtu: Boolean) = 
    server.deviceList()
            .first(emptyList())
            .flatMapObservable { devices ->
                Single.fromCallable {
                    if (ignoreMtu) {
                        bytes.size
                    } else {
                        devices.minBy { device -> device.mtu }?.mtu ?: DEFAULT_MTU
                    }
                }
                        .flatMapObservable { minMtu ->
                            Observable.fromIterable(bytes.asIterable())
                                    .buffer(minMtu)
                        }
                        .map { it.toByteArray() }
                        .doOnNext { server.currentData = bytes }
                        .map { devices }
                        // part i've left out: waiting for each device acknowledging the message, timeouts, etc.
            }

这里缺少的是我只允许 one 的部分同时发送一条信息。另外,我需要的是,如果我要将一条消息添加到队列中,我必须能够仅观察到这条消息的状态(已完成、错误)。

我想过什么是实现这一目标的最优雅的方式。我提出的解决方案包括例如 PublishSubject<ByteArray>我在其中推送消息(类似队列),添加订阅者并观察它 - 但这会抛出例如 onError如果上一条消息失败。

我想到的另一种方法是在创建/排队时给每条消息一个数字,并有一个全局“消息计数器”Observable,我会用 filter 挂接到链的开头。对于 currently sent message == MY_MESSAGE_ID .但这感觉有点脆弱。我可以在订阅终止时增加计数器,但我确信必须有更好的方法来实现我的目标。

感谢您的帮助。

最佳答案

供将来引用:我发现的最直接的方法是添加一个在单个线程上工作的调度程序,从而按顺序处理每个任务。

关于kotlin - 使用 RxJava 构建 "task queue"的最佳方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49960751/

相关文章:

constructor - 在 Kotlin 中匿名实现接口(interface)导致 "has no constructors"错误

kotlin - Kotlin中的引用平等

javascript - (jQuery) .hover() 中淡入/淡出元素的问题

android - Retrofit + RxJava 中的链接请求

java - 最后 onNext() 执行完成的 RxJava 回调

css - TornadoFX addClass 不添加CSS

kotlin - 奇怪的kotlin checkNotNullParameter错误

queue - NServiceBus、MassTransit 和 Rabbit MQ 还是 Kafka?

c# - 队列中有新项目时如何出队

android - 使用 Retrofit 2.0 和 RxJava 获取响应状态码