rxjs - 处理两个可观察对象的优先级

标签 rxjs

我有两个优先级不同的流。

每个事件必须播放持续例如 5 秒的声音。 要求是:

  1. 高优先级事件必须杀死低优先级声音
  2. 否则每个事件都必须等待前一个声音完成

我试过这种方式也是low kill high event example playground

最佳答案

这很棘手,但我认为通过组合 concatMaptakeUntil 是可行的。我留下了一些日志来更清楚地说明它是如何工作的。

const randomStream$ = defer(() => of(null).pipe(
  delay(Math.random() * 10000),
)).pipe(take(1), repeat());

let hIndex = 0;
let lIndex = 0;
const high$ = randomStream$.pipe(map(v => `H ${hIndex++}`));
const low$ = randomStream$.pipe(map(v => `L ${lIndex++}`));

merge(high$, low$).pipe(
  concatMap(v => {
    const sound$ = of(v).pipe(
      delay(5000),
    );

    if (v[0] === 'H') { // Item from the high priority stream
      return sound$;
    } else {
      return sound$.pipe(
        takeUntil(high$.pipe(
          tap(() => console.log(`${v} canceled`))
        )),
      );
    }
  }),
).subscribe(v => console.log(`${v} done`));

现场演示:https://stackblitz.com/edit/rxjs-jdbkhb

所以 high$low$ 的所有发射都变成了 sound$,耗时 5 秒。然后针对不同类型的流进行逻辑拆分:

  • 刚刚返回高优先级流,感谢 concatMapsound$ 确实需要 5 秒才能完成。

  • 低优先级流与 takeUntil 链接,因此当发出另一个 high$ 时,它会立即完成并且周围的 concatMap 开始处理另一个。如果 low$ 出现,则它由 concatMap 排队。

顺便说一句,我不知道你是否想对 low$high$ 的发射进行排队,因为如果你有多个 low$ 缓冲后,它们将按照与 high$ 排放相结合的相同顺序进行处理。

所以也许你会想忽略来自 low$ 的所有排放,而 high$ 有待处理的排放,但这将需要一个副作用(变量链外)将计算来自 high$ 的未决排放量(或者当存在未决的高优先级项目时,可能只是从 concatMap 返回 EMPTY待定?)。

关于rxjs - 处理两个可观察对象的优先级,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54204079/

相关文章:

Angular2 NGRX 调度时出现性能问题?

javascript - RxJS 合并 2 个订阅

javascript - 无法以 Angular 显示用户名

javascript - 为什么RxJS运算符过滤和映射大尺寸数组比Javascript数组方法更快?

javascript - RxJs:在 flatMapLatest 完成后访问 flatMapLatest 之前的数据

javascript - 我可以将管道添加到现有的可观察对象中吗

angular - 类型错误 : Object doesn't support property or method 'map' - AngularJS2

javascript - 可观察对象相对于异步迭代器的优势

javascript - RxJS 5 子类化 Observable - 静态方法返回父类的实例

rxjs - 如何使用 rxjs bindCallback