javascript - 基于 rxjs 中的时间处理事件流

标签 javascript node.js asynchronous rxjs

我有一个定期向我发送数据包的进程,我需要根据数据包到达的时间等来管理该流。在某些时候,我还会关闭流和进程。

现在,我正在使用一组计时器来执行此操作,但我希望我可以使用 rxjs 来执行此操作,因为它似乎非常适合此类操作。到目前为止,我还没有取得太大的成功。

问题

流本应定期给我发送数据包,但它通常会偏离很多,有时会卡住。

在以下情况下,我想在某个时候关闭流:

  1. 如果向我发送第一个数据包所需的时间超过startDelay
  2. 第一个数据包发送后,如果两个数据包之间有超过middleDelay的停顿。
  3. 经过一个恒定的时间段 maxChannelTime

当我由于上述任何原因要关闭流时,我首先请求它礼貌地关闭,以便它可以进行一些清理。有时它还会在清理期间向我发送最终数据包。但我希望在关闭流并忽略任何更多消息之前等待清理和最后数据到达的时间不超过 cleanupTime

阐述

我将通过用 Observable 包装事件来创建“流”。我这样做没有问题。

通过“关闭”流,我的意思是告诉进程停止发送数据,并可能关闭(即死亡)。

最佳答案

棘手的问题。

我将其分为两个阶段 - “受监管”(因为我们想定期检查)和“清理”。

逆向计算,输出为

const regulated = source.takeUntil(close)
const cleanup = source.skipUntil(close).takeUntil(cleanupCloser)
const output = regulated.merge(cleanup)

“关闭器”是在关闭时发出的可观察对象(每个超时值一个关闭器)。

const startTimeout = 600
const intervalTimeout = 200
const maxtimeTimeout = 3000
const cleanupTimeout = 300

const startCloser = Observable.timer(startTimeout)  // emit once after initial delay
  .takeUntil(source)                                // cancel after source emits
  .mapTo('startTimeoutMarker')

const intervalCloser = source.switchMap(x =>    // reset interval after each source emit
    Observable.timer(intervalTimeout)           // emit once after intervalTimeout
      .mapTo('intervalTimeoutMarker')
  )

const maxtimeCloser = Observable.timer(maxtimeTimeout)  // emit once after maxtime
  .takeUntil(startCloser)                               // cancel if startTimeout
  .takeUntil(intervalCloser)                            // cancel if intervalTimeout
  .mapTo('maxtimeTimeoutMarker')

const close = Observable.merge(startCloser, intervalCloser, maxtimeCloser).take(1)

const cleanupCloser = close.switchMap(x =>      // start when close emits
     Observable.timer(cleanupTimeout)           // emit once after cleanup time
  ) 
  .mapTo('cleanupTimeoutMarker')

这是一个工作示例 CodePen (请一次运行一个测试)

关于javascript - 基于 rxjs 中的时间处理事件流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47120760/

相关文章:

javascript - 需要了解 JavaScript 数组、对象和属性

javascript - 获取可拖动后面的每个 div 的 id

javascript - 使用 event.preventDefault() 的 Reactjs 默认表单验证

javascript - 尝试使用 Express js 路由解析 API 响应时出现 HPE_INVALID_HEADER_TOKEN

javascript - 在 clinetside javascript var 上保存 nodejs 数据

javascript - 在javascript中将并行字符串化(序列化)到JSON

c# - 如何异步处理?

javascript - 在创建新实例的构造函数闭包内更改 "var"

Node.js 服务器文件执行

asp.net-mvc - ASP.NET MVC 中的异步 Controller