typescript - 为什么 takeWhile 运算符在包含时不过滤?

标签 typescript rxjs

我正在尝试使用包含选项设置为 true 的 takewhile 运算符,但我遇到了一种我不理解的行为。 我已经能够分离出一小段代码,我可以在其中重现该行为

import { from, BehaviorSubject } from 'rxjs'; 
import { map, takeWhile } from 'rxjs/operators';

const value$ = new BehaviorSubject<number>(1);

const source = value$.pipe(
  map(x => `value\$ = ${x}`),
  takeWhile(x => !x.includes('4'), /*inclusive flag: */true)
);

source.subscribe(x => {
  console.log(x); 
  value$.next(4); // Strange behavior only in this case
  });

解释: 如果没有 inclusive 标志,它会记录 'value$ = 1' 并且流完成

但是,将 inclusive 标志设置为 true 时,它​​会抛出一个 stackoverflow 异常 enter image description here

我的问题是,为什么它不止一次经历 takeWhile 而不是在第一次出现后停止?

这里有一个 bench 的链接,它是否有助于理解: https://stackblitz.com/edit/rxjs-ag4aqx

最佳答案

在对 operator ( https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/takeWhile.ts ) 的源代码进行了一些挖掘之后,确实存在一些错误,我将作为一个问题报告给 github。

与此同时,这是一个固定的自定义 takeWhileInclusive 运算符

import { from, BehaviorSubject, Observable } from 'rxjs';
import { map, takeWhile } from 'rxjs/operators';

/** Custom takewhile inclusive Custom takewhile inclusive properly implemented */
const customTakeWhileInclusive = <T>(predicate: (value: T) => boolean) => (source: Observable<T>) => new Observable<T>(observer => {
  let lastMatch: T | undefined // fix
  return source.subscribe({
    next: e => {
      if (lastMatch) {
        observer.complete();
      }
      else {
        if (predicate(e)) {
          /*
           *   Code from https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/takeWhile.ts
           *  
           *   if (this.inclusive) {
           *      destination.next(value); // NO! with a synchronous scheduler, it will trigger another iteration without reaching the next "complete" statement 
           *      and there is no way to detect if a match already occured!
           *   }
           *   destination.complete();
           */

          // Fix:
          lastMatch = e; // prevents from having stackoverflow issue here
        }

        observer.next(e);
      }
    },
    error: (e) => observer.error(e),
    complete: () => observer.complete()
  });
});

const value$ = new BehaviorSubject<number>(1);

const source = value$.pipe(
  map(x => `value\$ = ${x}`),
  //takeWhile(x => !x.includes('4'), true)
  customTakeWhileInclusive(x => x.includes('4'))  // fix
);

source.subscribe(x => {
  console.log(x);
  value$.next(4);
});

实际运算符的问题在于,在同步调度程序上下文中,当匹配发生时它会触发另一次迭代并且永远不会达到“完成”。 正确的实现方式是标记匹配项并执行另一次最后一次迭代,在该迭代中您完成对标记的检测。

链接到更新的 stackblitz:https://stackblitz.com/edit/rxjs-ag4aqx

关于typescript - 为什么 takeWhile 运算符在包含时不过滤?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58064760/

相关文章:

css - 左 Angular 动画不起作用

typescript - 受歧视的联合是否只适用于文字类型?

typescript - React Native Text 不受容器 View 的限制

javascript - 在 RxJS 中链接订阅

angular - 带有 Angular HttpClient 的 RxJs switchMap

javascript - RXJS 在 html5 Canvas 上画线

javascript - 如何更新或删除 ngrx 实体中的嵌套对象?

Typescript 父类对子类的类型保护

typescript - 泛型的动态接口(interface)

javascript - 当我在可观察对象的管道中绘制 map 时,对象的类型是什么?