这纯粹是了解运算符的学术练习。如何使用 RXJS 交错传入流
例如从这里开始:
// RxJS v6+
import { of } from 'rxjs';
//emits any number of provided values in sequence
const source = of(1, 2, 3, 4, 5);
//output: 1,2,3,4,5
const subscribe = source.subscribe(val => console.log(val));
对此:
import { of } from 'rxjs';
const source1 = of(1,3,5,7,9);
const source2 = of(2,4,6,8,10);
// a simple merge will just append source1 and source2
// how do I obtain an output = 1,2,3,4,5,6,7,8,9,10
const subscribe = merge(source1, source2).subscribe(val => console.log(val))
约束:这是一个一般性问题,因此我不想使用基于此特定示例的奇数/偶数分隔的谓词,而是纯粹基于传入元素的顺序创建输出,即从每个流中获取第一个,然后第二个,依此类推
这可能吗?
最佳答案
执行此操作的标准方法确实是 merge函数,它可以接收任意数量的可观察量作为参数,并按顺序发出它们的所有项目,无论函数参数中可观察量的顺序如何。但正如您所指出的,它附加了结果,其原因不是合并问题,而是实际上 of同步发出这些值,因此它们都在同一个 javascript 循环中按顺序运行。您可以通过使用异步发出值的 observable 来更改它,或者使用一些可用的异步创建一个 observable Schedulers
异步可观察:
我创建了这个 asyncOf
函数作为 Observable 的模型,其值随着时间的推移而发出,就像 websocket 客户端或用户交互一样。
import { merge, Observable, EMPTY } from 'rxjs';
import { toArray } from 'rxjs/operators';
const asyncOf = <T = any>(args: T[], interval = 500): Observable<T> => {
let i = 0;
let intervalRef;
if(!args?.length) {
return EMPTY;
}
return new Observable(observer => {
intervalRef = setInterval(() => {
if(args[i]) {
observer.next(args[i])
}
++i;
if(!args[i]) {
observer.complete()
if(intervalRef) {
clearInterval(intervalRef)
}
}
}, interval)
})
}
const source1 = asyncOf([1, 3, 5, 7, 9]);
const source2 = asyncOf([2, 4, 6, 8, 10]);
//output: 1,2,3,4,5,6,7,8,9,10
const subscribe = merge(source1, source2)
//.pipe(toArray()) //collect all emmited values and emmit an array when the observable completes
.subscribe((val) => console.log(val));
在这种情况下,合并工作正常,因为两个可观察对象都异步运行。
异步调度程序
另一种选择是使用一个异步调度程序(或创建您自己的)来描述每个项目应何时被发出。这更接近您的示例,并且更深入地了解 rxjs:
import { merge, scheduled, asyncScheduler } from 'rxjs';
import { toArray } from 'rxjs/operators';
const source1 = scheduled([1, 3, 5, 7, 9], asyncScheduler);
const source2 = scheduled([2, 4, 6, 8, 10], asyncScheduler);
//output: 1,2,3,4,5,6,7,8,9,10
const subscribe = merge(source1, source2)
//.pipe(toArray()) //collect all emmited values and emmit an array when the observable completes
.subscribe((val) => console.log(val));
Scheduled来自 RXJS 6.5+ 并弃用在其他函数中使用调度程序函数,例如 of、
关于javascript - RXJS:我可以根据单个输出的条目顺序交错多个流吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72856926/