我试图将对服务器的调用捆绑到最大 maxEntries,但不想等待超过 maxWait 毫秒。这曾经在 RxJS 4 中作为 windowWithTimeOrCount()
提供,但已从 RxJS 5 中删除。
一切都工作得很好,除了窗口的最后一个元素丢失了。说到“迷失”——这就是我现在的感受。有 RxJS 大师可以告诉我我做错了什么吗?
private chunk(queue: Observable<CacheEntry>, maxEntries: number, maxWait: number): Observable<Observable<CacheEntry>> {
// We have an incoming stream of CacheEntries to be retrieved. We want to bundle these in units of max maxEntries
// but wait no longer than max maxWait ms. We return an Observable, that emits Observables of CacheEntries that
// complete after maxEntries / maxWait (whatever comes first).
const toggleSubject = new Subject<void>();
return queue
// Start emitting a new Observable every time toggleSubject emits.
// (bufferWhen() wouldn't work as we have to count the elements as they come and buffer only gives us the
// complete collection)
.windowWhen(() => toggleSubject)
// map() is called once for every window (maxEntries/maxWait)
// the inner do() is called for every element in the window, allowing us to set up the timeout callback and to
// count all elements, then emitting on toggleSubject, triggering a new Observable.
// (We have to map() here - instead of an outer do() - because otherwise the original obs would be streamed
// and the hooked up version with the inner do() would never be called.)
.map((obs) => {
// counts the number of cacheEntries already in this stream
let count = 0;
// flag to kill the timeout callback
let done = false;
// we have to return an Observable
return obs.do(() => {
count++;
if (count === 1) {
// we start counting when the first element is streamed.
IntervalObservable.create(maxWait).first().subscribe(() => {
if (!done) {
//trigger due to maxWait
toggleSubject.next(null);
}
});
}
if (count > (maxEntries)) {
done = true;
// trigger due due to maxEntries(' + maxEntries + ')');
toggleSubject.next(null);
}
}
);
});
}
由于 if (count > (maxEntries))
触发 toggleSubject.next(null)
的元素丢失(不在任何窗口中)。
编辑:maxTime 在新 Observable 的第一个元素被推送时开始计时。 if(计数 === 1)
。这是 a) 我在 map()
中的窗口化 Observables 内部工作的原因,b) 很重要,因为这是必需的行为。
示例:maxElements: 100,maxWait: 100。在 t=99 时推送了 101 个元素。预期行为:在 t=99 时,推送具有 100 个元素的 Observable。剩余 1 个元素。计数器+定时器复位。在 t=199 时,第二个“ block ”的计数器到期并推送具有 1 个元素的 Observable。
(在这个例子中,Brandons(参见答案)代码将 - 如果我正确地阅读它 - 在 t=99 处推送一个包含 100 个元素的 Observable,一毫秒后,在 t=100 处,推送一个 Observable与一个元素。)
最佳答案
是的,您不想使用 map
来产生这样的副作用。正如您所注意到的,您最终会掉落元素。
这是一个通用方法,我认为它可以满足您的需求。
注意:RXJS 5 目前有 issue以及此发布重载的类型定义。我添加了一些类型转换,应该允许它在 TypeScript 中编译。
chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> {
// use publish() so that we can subscribe multiple times to the same stream of data.
return queue.publish(entries => {
// observable which will trigger after maxWait
const timer = IntervalObservable.create(maxWait);
// observable which will trigger after maxEntries
const limit = entries.take(maxEntries).last();
// observable which will trigger on either condition
const endOfWindow = limit.takeUntil(timer);
// use endOfWindow to close each window.
return entries.windowWhen(() => endOfWindow) as Observable<T>;
}) as Observable<Observable<T>>;
}
编辑:
如果您不希望计时器在第一个项目到达每个窗口后才启动,那么您可以这样做:
chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> {
// use publish() so that we can subscribe multiple times to the same stream of data.
return queue.publish(entries => {
// observable which will trigger after maxWait after the first
// item in this window arrives:
const timer = entries.take(1).delay(maxWait);
// observable which will trigger after maxEntries
const limit = entries.take(maxEntries).last();
// observable which will trigger on either condition
const endOfWindow = limit.takeUntil(timer);
// use endOfWindow to close each window.
return entries.windowWhen(() => endOfWindow) as Observable<T>;
}) as Observable<Observable<T>>;
}
关于javascript - RxJS - 对 maxWait 和 maxElements 窗口使用 windowWhen(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39209560/