我有一个定期向我发送数据包的进程,我需要根据数据包到达的时间等来管理该流。在某些时候,我还会关闭流和进程。
现在,我正在使用一组计时器来执行此操作,但我希望我可以使用 rxjs
来执行此操作,因为它似乎非常适合此类操作。到目前为止,我还没有取得太大的成功。
问题
流本应定期给我发送数据包,但它通常会偏离很多,有时会卡住。
在以下情况下,我想在某个时候关闭流:
- 如果向我发送第一个数据包所需的时间超过
startDelay
。 - 第一个数据包发送后,如果两个数据包之间有超过
middleDelay
的停顿。 - 经过一个恒定的时间段
maxChannelTime
。
当我由于上述任何原因要关闭流时,我首先请求它礼貌地关闭,以便它可以进行一些清理。有时它还会在清理期间向我发送最终数据包。但我希望在关闭流并忽略任何更多消息之前等待清理和最后数据到达的时间不超过 cleanupTime
。
阐述
我将通过用 Observable 包装事件来创建“流”。我这样做没有问题。
通过“关闭”流,我的意思是告诉进程停止发送数据,并可能关闭(即死亡)。
最佳答案
棘手的问题。
我将其分为两个阶段 - “受监管”(因为我们想定期检查)和“清理”。
逆向计算,输出为
const regulated = source.takeUntil(close)
const cleanup = source.skipUntil(close).takeUntil(cleanupCloser)
const output = regulated.merge(cleanup)
“关闭器”是在关闭时发出的可观察对象(每个超时值一个关闭器)。
const startTimeout = 600
const intervalTimeout = 200
const maxtimeTimeout = 3000
const cleanupTimeout = 300
const startCloser = Observable.timer(startTimeout) // emit once after initial delay
.takeUntil(source) // cancel after source emits
.mapTo('startTimeoutMarker')
const intervalCloser = source.switchMap(x => // reset interval after each source emit
Observable.timer(intervalTimeout) // emit once after intervalTimeout
.mapTo('intervalTimeoutMarker')
)
const maxtimeCloser = Observable.timer(maxtimeTimeout) // emit once after maxtime
.takeUntil(startCloser) // cancel if startTimeout
.takeUntil(intervalCloser) // cancel if intervalTimeout
.mapTo('maxtimeTimeoutMarker')
const close = Observable.merge(startCloser, intervalCloser, maxtimeCloser).take(1)
const cleanupCloser = close.switchMap(x => // start when close emits
Observable.timer(cleanupTimeout) // emit once after cleanup time
)
.mapTo('cleanupTimeoutMarker')
这是一个工作示例 CodePen (请一次运行一个测试)
关于javascript - 基于 rxjs 中的时间处理事件流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47120760/