我有两个优先级不同的流。
每个事件必须播放持续例如 5 秒的声音。 要求是:
- 高优先级事件必须杀死低优先级声音
- 否则每个事件都必须等待前一个声音完成
我试过这种方式也是low kill high event example playground
最佳答案
这很棘手,但我认为通过组合 concatMap
和 takeUntil
是可行的。我留下了一些日志来更清楚地说明它是如何工作的。
const randomStream$ = defer(() => of(null).pipe(
delay(Math.random() * 10000),
)).pipe(take(1), repeat());
let hIndex = 0;
let lIndex = 0;
const high$ = randomStream$.pipe(map(v => `H ${hIndex++}`));
const low$ = randomStream$.pipe(map(v => `L ${lIndex++}`));
merge(high$, low$).pipe(
concatMap(v => {
const sound$ = of(v).pipe(
delay(5000),
);
if (v[0] === 'H') { // Item from the high priority stream
return sound$;
} else {
return sound$.pipe(
takeUntil(high$.pipe(
tap(() => console.log(`${v} canceled`))
)),
);
}
}),
).subscribe(v => console.log(`${v} done`));
现场演示:https://stackblitz.com/edit/rxjs-jdbkhb
所以 high$
和 low$
的所有发射都变成了 sound$
,耗时 5 秒。然后针对不同类型的流进行逻辑拆分:
刚刚返回高优先级流,感谢
concatMap
,sound$
确实需要 5 秒才能完成。低优先级流与
takeUntil
链接,因此当发出另一个high$
时,它会立即完成并且周围的concatMap
开始处理另一个。如果low$
出现,则它由concatMap
排队。
顺便说一句,我不知道你是否想对 low$
和 high$
的发射进行排队,因为如果你有多个 low$
缓冲后,它们将按照与 high$
排放相结合的相同顺序进行处理。
所以也许你会想忽略来自 low$
的所有排放,而 high$
有待处理的排放,但这将需要一个副作用(变量链外)将计算来自 high$
的未决排放量(或者当存在未决的高优先级项目时,可能只是从 concatMap
返回 EMPTY
待定?)。
关于rxjs - 处理两个可观察对象的优先级,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54204079/