我将 RxJS 与基本的主题对象一起使用,该对象在未知时间接收来自不同位置的输入。这些输入需要堆叠起来并异步发送到服务器 - 但我需要等到 ajax 请求完成后再尝试处理下一组缓冲的输入。
flatMapLatest 不适合我的情况,因为每个响应都必须唯一处理。
我可以在 ajax 请求处理过程中暂停缓冲区,但这意味着我会丢失输入,这是 Not Acceptable 。
我看到缓冲区打开对象只是一个可观察对象,我是否可以以某种方式将计时器可观察对象和自定义可观察对象结合起来,当我的ajax请求完成时触发,并且当发送ajax请求时我可以暂停计时器?
抱歉,我还没有任何代码可以展示,我实际上仍处于设计阶段。
我想做的事情可行还是我的做法不正确?
编辑
这是我到目前为止的代码,它似乎可以实现我想要的功能 - 但仍然愿意改进。
var bufferedIntervalPauser = new Rx.Subject<boolean>();
var pausableInterval = Rx.Observable.interval(500).pausable(bufferedIntervalPauser);
var purStream = this.updateReqestSubject.asObservable()
.buffer(pausableInterval)
.where(b => (b || []).length > 0)
var purSub = purStream.subscribe(
next => {
bufferedIntervalPauser.onNext(false); // pause the buffer window
SomeAjaxMethod(next, {
success: res => {
this.HandleResult(res);
},
always: () => {
// when the ajax completes, resume the buffer
bufferedIntervalPauser.onNext(true);
}
});
},
err => {
console.error(err);
});
// start
bufferedIntervalPauser.onNext(true);
最佳答案
您应该能够仅使用buffer
运算符。正如 @Enigmativity 指出的,Rx 有一个序列化协议(protocol),因此您的处理程序永远不会同时运行,即当它仍在从先前的调用运行时,它不会被调用。然而,如果您的处理程序本身调用非阻塞/asnyc 方法,那么在创建某些异步响应之前,不会有任何阻塞 Rx 来调用您的处理程序。在这种情况下,您可能希望将异步调用包装在可观察的序列中。然后您可以使用像concat
这样的运算符来确保多个可观察序列的序列化
编辑:
这里的问题是您正在订阅中进行异步工作。您需要做的是将您的 SomeAjaxMethod
视为可观察的序列,然后仅使用组合。
//Psuedo code as I dont have a Java IDE at hand
this.updateReqestSubject.asObservable()
.Buffer(500ms)
.Select(buffer=>SomeAjaxMethod(buffer).asObservable().Catch(/*Swallow to match example*/))
.Concat() //Concatenate the IObservable<IObservable<>> into IObservable<>
.Subscribe(
res=>this.HandleResult(res),
err => {
console.error(err);
});
关于javascript - RxJS 在处理下一个缓冲 block 之前等待 ajax 响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35605795/