我有一个可观察对象,它从服务器提取事件,过滤应用程序类型的事件,然后订阅事件并将其分派(dispatch)给一个或多个处理程序来处理。
然后处理程序开始对数据库进行一些异步更新,我发现可观察的事件会如此快地发出事件,以至于更新会相互影响。这是我应该预料到的。所以我认为我需要我的处理程序每个都使用自己的可观察量来充当队列,该队列将处理一个事件并等待确认。
所以我的问题是,如何创建一个连续接收消息并一次发送一条消息的可观察对象,在释放下一条消息之前等待确认?
我认为可观察量也必须是冷的,因为我不能丢失消息。
最佳答案
我认为运算符 concatMap
所做的事情与您正在寻找的内容很接近。您可以在此处查看以前的答案,以说明 concatMap
的类似用例:
RxJS queueing dependent tasks
它很接近,但不完全是您想要的,因为无需等待 ACK
信号来释放下一个值。相反,concatMap 使用当前“已执行”可观察对象的完成信号来订阅下一个可观察对象。如果您的可观察对象包含在数据库上执行更新,那么这些更新将按顺序执行。例如:
function handler (source$) {
// source$ is your source of events from which you generate the update calls
return source$.concatMap(function (event){
return updateDB(event);
})
}
function updateDB(event) {
return Rx.Observable.create(function(observer){
// do the update in the db
// you probably have a success and error handler
// you plug the observer notification into those handlers
if (success) {
// if you need to pass down some value from the update
observer.onNext(someValue);
// In any case, signal completion to allow concatMap to move to next update
observer.onCompleted();
}
if (error) {observer.onError(error);}
})
}
这是专门针对您正在使用的库的通用代码。您也许可以直接使用运算符 fromNodeCallback
或 fromCallback
,具体取决于数据库更新函数的 API。
此外,如果您使用的是 RxJS v5,onError
会变为 error
,onComplete
会变为 complete
,onNext
变为 next
(参见 new observer interface )。
最后一条评论,流的有损/无损性质是与流的热与冷性质不同的概念。您可以看看illustrated subscription and data flows对于这两种类型的流。
关于javascript - 如何在 rxjs 中链接可观察量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35158387/