javascript - 如何在 rxjs 中链接可观察量

标签 javascript node.js observable rxjs frp

我有一个可观察对象,它从服务器提取事件,过滤应用程序类型的事件,然后订阅事件并将其分派(dispatch)给一个或多个处理程序来处理。

然后处理程序开始对数据库进行一些异步更新,我发现可观察的事件会如此快地发出事件,以至于更新会相互影响。这是我应该预料到的。

所以我认为我需要我的处理程序每​​个都使用自己的可观察量来充当队列,该队列将处理一个事件并等待确认。

所以我的问题是,如何创建一个连续接收消息并一次发送一条消息的可观察对象,在释放下一条消息之前等待确认?

我认为可观察量也必须是冷的,因为我不能丢失消息。

最佳答案

我认为运算符 concatMap 所做的事情与您正在寻找的内容很接近。您可以在此处查看以前的答案,以说明 concatMap 的类似用例: RxJS queueing dependent tasks

它很接近,但不完全是您想要的,因为无需等待 ACK 信号来释放下一个值。相反,concatMap 使用当前“已执行”可观察对象的完成信号来订阅下一个可观察对象。如果您的可观察对象包含在数据库上执行更新,那么这些更新将按顺序执行。例如:

function handler (source$) {
  // source$ is your source of events from which you generate the update calls
  return source$.concatMap(function (event){
    return updateDB(event);
  })
}

function updateDB(event) {
  return Rx.Observable.create(function(observer){
    // do the update in the db
    // you probably have a success and error handler 
    // you plug the observer notification into those handlers
    if (success) {
      // if you need to pass down some value from the update
      observer.onNext(someValue);
      // In any case, signal completion to allow concatMap to move to next update
      observer.onCompleted();
    }
    if (error) {observer.onError(error);}
  })
}

这是专门针对您正在使用的库的通用代码。您也许可以直接使用运算符 fromNodeCallbackfromCallback,具体取决于数据库更新函数的 API。

尽管如此,请注意,在执行当前可观察数据时,可能会涉及一些缓冲来保留下一个可观察数据,并且该缓冲区只能是有限的,因此如果生产者之间的速度确实存在显着差异和消费者或内存限制,您可能希望以不同的方式处理事情。

此外,如果您使用的是 RxJS v5,onError 会变为 erroronComplete 会变为 completeonNext 变为 next (参见 new observer interface )。

最后一条评论,流的有损/无损性质是与流的热与冷性质不同的概念。您可以看看illustrated subscription and data flows对于这两种类型的流。

关于javascript - 如何在 rxjs 中链接可观察量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35158387/

相关文章:

javascript - 使用javascript在php中计时器达到零时不调用提交方法

javascript - Webpack postcss-loader 在传递函数时不工作

javascript - AngularJS ng-disabled 不使用列表项

javascript - Express 和 Jade 中内联 jade.render() ?

python - 使用 websockets 是 IPC 的好主意吗?

javascript - 如何为某些特定的非 DOM 事件创建 Rx observable

javascript - 缩写的正则表达式

javascript - Angular2 重复标识符 'PropertyKey'

c# - 当其他可观察值为 true 时,从可观察值中获取项目

.net - 如何获取作为 IObservable 公开的主题的当前值?