我有两个 RX 无限流(我们称它们为 mainValues
和 decoratorValues
)。名为 mainValues
的函数将元素添加到名为 valueList
的列表中。另一个流 (decoratorValues
) 应该为列表中的这些元素分配属性。
两个流中的元素以随机顺序在随机时间到达,我需要以 mainValues
放入列表中的任一顺序来解决它一旦可用,而 decoratorValues
不会丢失,直到其相应的元素(由 ID 标识)到达。
const mainValues = [ { id: 1, someValue: 'a' }, { id: 2, someValue: 'b' }, { id: 3, someValue: 'c' }];
const decoratorValues = [ { id: 3, decoratedValue: true }, { id: 2, decoratedValue: false }];
// delay here is only for demonstration purpose
// a time independent solution can be tested with swapping it between these two
const mainValueStream = Rx.Observable.fromArray(mainValues).delay(2000);
const decoratorStream = Rx.Observable.fromArray(decoratorValues);
const valueList = [];
mainValueStream.subscribe(
val => valueList.push(val),
(err) => console.log(err),
() => console.log('complete', valueList));
// CODE time independent merging of the two streams go here
// resulting valueList should be: [ { id: 1, someValue: 'a' }, { id: 2, someValue: 'b', decoratedValue: false }, { id: 3, someValue: 'c', decoratedValue: true } ]
解决方案思路
mainValueStream
.concatMap(main => {
decoratorStream
.filter(dec => dec.id === main.id)
.do(dec => {
main.decoratedValue = dec.decoratedValue;
}).subscribe(x => x);
return [main];
})
.subscribe(
val => valueList.push(val),
(err) => console.log(err),
() => console.log('complete', valueList));
不幸的是,这仅当所有装饰器值在主值流之前到达时才有效。
我的第二个想法是添加第二个流,用于检查给定值是否已在 valueList
中,如果它具有具有适当 ID 的元素,则更新它。
是否存在一种与时间无关的解决方案,仅产生一个流?或者也许我被困住了,因为我想用一个流来解决它?
最佳答案
看起来最好的选择是这里的forkJoin()
。我正在考虑使用 zip()
但由于每个列表中的项目顺序和数量不同,您可能必须等到两者都完成后再处理它们的结果。
const mainValues = [ { id: 1, someValue: 'a' }, { id: 2, someValue: 'b' }, { id: 3, someValue: 'c' }];
const decoratorValues = [ { id: 3, decoratedValue: true }, { id: 2, decoratedValue: false }];
// delay here is only for demonstration purpose
// a time independent solution can be tested with swapping it between these two
const mainValueStream = Rx.Observable.from(mainValues).delay(500).toArray();
const decoratorStream = Rx.Observable.from(decoratorValues).toArray();
Observable.forkJoin(mainValueStream, decoratorStream, (main, decorator) => {
main.forEach(m => {
decorator.forEach(d => {
if (m.id === d.id) {
Object.assign(m, d);
}
})
});
return main;
})
.subscribe(val => console.log(val));
我在 forkJoin()
的结果选择器函数中使用两个嵌套的 forEach()
,因为它看起来很简单(请注意 Object .assign
自 ES6 起就存在)。我可以将它分成两个 Observable 流,然后过滤并合并它们,但这会变得非常困惑。
这会打印到控制台:
[ { id: 1, someValue: 'a' },
{ id: 2, someValue: 'b', decoratedValue: false },
{ id: 3, someValue: 'c', decoratedValue: true } ]
编辑:
如果您需要所有内容都是异步的,那么您可以使用 scan()
在装饰器到达时收集它们:
const mainValueStream = Rx.Observable.from(mainValues)
.concatMap(val => Observable.of(val).delay(Math.random() * 1000));
const decoratorStream = Rx.Observable.from(decoratorValues)
.concatMap(val => Observable.of(val).delay(Math.random() * 1000))
.scan((acc, val) => {
acc.push(val);
return acc;
}, [])
.share();
mainValueStream
.mergeMap(main => decoratorStream
.mergeAll()
.filter(d => d.id === main.id)
.defaultIfEmpty(null)
.map(d => {
if (d) {
return Object.assign(main, d);
} else {
return main;
}
})
.take(1)
)
.subscribe(val => console.log(val));
这会在结果到达时按随机顺序打印结果:
{ id: 2, someValue: 'b', decoratedValue: false }
{ id: 1, someValue: 'a' }
{ id: 3, someValue: 'c', decoratedValue: true }
核心是在 decoratorStream
到达时收集它们。然后,在 mainValueStream
中的每个项目上,我创建另一个 Observable,它在收到其匹配的装饰项目之前不会发出。
这样做的缺点是 scan()
中的累加器仍在增长,并且没有简单的方法可以释放已使用的项目。
关于javascript - RxJs:从 ID 标识的其他流更新列表中的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42349778/