javascript - RxJS - 对 maxWait 和 maxElements 窗口使用 windowWhen()

标签 javascript rxjs rxjs5

我试图将对服务器的调用捆绑到最大 maxEntries,但不想等待超过 maxWait 毫秒。这曾经在 RxJS 4 中作为 windowWithTimeOrCount() 提供,但已从 RxJS 5 中删除。

一切都工作得很好,除了窗口的最后一个元素丢失了。说到“迷失”——这就是我现在的感受。有 RxJS 大师可以告诉我我做错了什么吗?

 private chunk(queue: Observable<CacheEntry>, maxEntries: number, maxWait: number): Observable<Observable<CacheEntry>> {

    // We have an incoming stream of CacheEntries to be retrieved. We want to bundle these in units of max maxEntries
    // but wait no longer than max maxWait ms. We return an Observable, that emits Observables of CacheEntries that
    // complete after maxEntries / maxWait (whatever comes first).
    const toggleSubject = new Subject<void>();

    return queue

    // Start emitting a new Observable every time toggleSubject emits.
    // (bufferWhen() wouldn't work as we have to count the elements as they come and buffer only gives us the
    // complete collection)
      .windowWhen(() => toggleSubject)

      // map() is called once for every window (maxEntries/maxWait)
      // the inner do() is called for every element in the window, allowing us to set up the timeout callback and to
      // count all elements, then emitting on toggleSubject, triggering a new Observable.
      // (We have to map() here - instead of an outer do() -  because otherwise the original obs would be streamed
      // and the hooked up version with the inner do() would never be called.)
      .map((obs) => {
        // counts the number of cacheEntries already in this stream
        let count = 0;
        // flag to kill the timeout callback
        let done = false;
        // we have to return an Observable
        return obs.do(() => {
            count++;
            if (count === 1) {
              // we start counting when the first element is streamed.
              IntervalObservable.create(maxWait).first().subscribe(() => {
                if (!done) {
                  //trigger due to maxWait
                  toggleSubject.next(null);
                }
              });
            }
            if (count > (maxEntries)) {
              done = true;
              // trigger due due to maxEntries(' + maxEntries + ')');
              toggleSubject.next(null);
            }
          }
        );
      });
  }

由于 if (count > (maxEntries)) 触发 toggleSubject.next(null) 的元素丢失(不在任何窗口中)。

编辑:maxTime 在新 Observable 的第一个元素被推送时开始计时。 if(计数 === 1)。这是 a) 我在 map() 中的窗口化 Observables 内部工作的原因,b) 很重要,因为这是必需的行为。

示例:maxElements: 100,maxWait: 100。在 t=99 时推送了 101 个元素。预期行为:在 t=99 时,推送具有 100 个元素的 Observable。剩余 1 个元素。计数器+定时器复位。在 t=199 时,第二个“ block ”的计数器到期并推送具有 1 个元素的 Observable。

(在这个例子中,Brandons(参见答案)代码将 - 如果我正确地阅读它 - 在 t=99 处推送一个包含 100 个元素的 Observable,一毫秒后,在 t=100 处,推送一个 Observable与一个元素。)

最佳答案

是的,您不想使用 map 来产生这样的副作用。正如您所注意到的,您最终会掉落元素。

这是一个通用方法,我认为它可以满足您的需求。

注意:RXJS 5 目前有 issue以及此发布重载的类型定义。我添加了一些类型转换,应该允许它在 TypeScript 中编译。

chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> {
    // use publish() so that we can subscribe multiple times to the same stream of data.
    return queue.publish(entries => {
        // observable which will trigger after maxWait
        const timer = IntervalObservable.create(maxWait);
        // observable which will trigger after maxEntries
        const limit = entries.take(maxEntries).last();
        // observable which will trigger on either condition
        const endOfWindow = limit.takeUntil(timer);

        // use endOfWindow to close each window.
        return entries.windowWhen(() => endOfWindow) as Observable<T>;
    }) as Observable<Observable<T>>;
}

编辑:

如果您不希望计时器在第一个项目到达每个窗口后才启动,那么您可以这样做:

chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> {
    // use publish() so that we can subscribe multiple times to the same stream of data.
    return queue.publish(entries => {
        // observable which will trigger after maxWait after the first
        // item in this window arrives:
        const timer = entries.take(1).delay(maxWait);
        // observable which will trigger after maxEntries
        const limit = entries.take(maxEntries).last();
        // observable which will trigger on either condition
        const endOfWindow = limit.takeUntil(timer);

        // use endOfWindow to close each window.
        return entries.windowWhen(() => endOfWindow) as Observable<T>;
    }) as Observable<Observable<T>>;
}

关于javascript - RxJS - 对 maxWait 和 maxElements 窗口使用 windowWhen(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39209560/

相关文章:

angular - 数据更改时重新创建组件

javascript - jqueryUI 可排序 : Uncaught Error: HIERARCHY_REQUEST_ERR: DOM Exception 3

angular - 如何使用rxjs的过滤器运算符?

websocket - 如何防止websocket在angular5中的特定时间后关闭?

rxjs - 何时在 rxjs 中使用 asObservable() ?

angular - 与可观察对象并行读取文件

javascript - 使用 AngularJS 总结两个 HTML 模板绑定(bind)的内容

javascript - 使用 Redux 防止重复的最佳实践是什么?

javascript - 如何像jQuery一样创建多个不同的实例?

javascript - 错误是否被视为 RxJ 中可观察量的发射?