javascript - RxJs:从 ID 标识的其他流更新列表中的值

标签 javascript rxjs rxjs5

我有两个 RX 无限流(我们称它们为 mainValuesdecoratorValues)。名为 mainValues 的函数将元素添加到名为 valueList 的列表中。另一个流 (decoratorValues) 应该为列表中的这些元素分配属性。

两个流中的元素以随机顺序在随机时间到达,我需要以 mainValues 放入列表中的任一顺序来解决它一旦可用,而 decoratorValues 不会丢失,直到其相应的元素(由 ID 标识)到达。

const mainValues = [ { id: 1, someValue: 'a' }, { id: 2, someValue: 'b'  }, { id: 3, someValue: 'c' }];
const decoratorValues = [ { id: 3, decoratedValue: true }, { id: 2, decoratedValue: false }];

// delay here is only for demonstration purpose 
// a time independent solution can be tested with swapping it between these two
const mainValueStream = Rx.Observable.fromArray(mainValues).delay(2000);
const decoratorStream = Rx.Observable.fromArray(decoratorValues);

const valueList = [];

mainValueStream.subscribe(
  val => valueList.push(val), 
  (err) => console.log(err),
  () => console.log('complete', valueList));

// CODE time independent merging of the two streams go here

// resulting valueList should be:  [ { id: 1, someValue: 'a' }, { id: 2, someValue: 'b', decoratedValue: false }, { id: 3, someValue: 'c', decoratedValue: true } ]

解决方案思路

mainValueStream
  .concatMap(main => {
     decoratorStream
       .filter(dec => dec.id === main.id)
       .do(dec => {
          main.decoratedValue = dec.decoratedValue;
     }).subscribe(x => x);

     return [main];
  })
  .subscribe(
    val => valueList.push(val), 
    (err) => console.log(err),
    () => console.log('complete', valueList));

不幸的是,这仅当所有装饰器值在主值流之前到达时才有效。

我的第二个想法是添加第二个流,用于检查给定值是否已在 valueList 中,如果它具有具有适当 ID 的元素,则更新它。

是否存在一种与时间无关的解决方案,仅产生一个流?或者也许我被困住了,因为我想用一个流来解决它?

最佳答案

看起来最好的选择是这里的forkJoin()。我正在考虑使用 zip() 但由于每个列表中的项目顺序和数量不同,您可能必须等到两者都完成后再处理它们的结果。

const mainValues = [ { id: 1, someValue: 'a' }, { id: 2, someValue: 'b'  }, { id: 3, someValue: 'c' }];
const decoratorValues = [ { id: 3, decoratedValue: true }, { id: 2, decoratedValue: false }];

// delay here is only for demonstration purpose
// a time independent solution can be tested with swapping it between these two
const mainValueStream = Rx.Observable.from(mainValues).delay(500).toArray();
const decoratorStream = Rx.Observable.from(decoratorValues).toArray();

Observable.forkJoin(mainValueStream, decoratorStream, (main, decorator) => {
    main.forEach(m => {
      decorator.forEach(d => {
        if (m.id === d.id) {
          Object.assign(m, d);
        }
      })
    });

    return main;
  })
  .subscribe(val => console.log(val));

我在 forkJoin() 的结果选择器函数中使用两个嵌套的 forEach() ,因为它看起来很简单(请注意 Object .assign 自 ES6 起就存在)。我可以将它分成两个 Observable 流,然后过滤并合并它们,但这会变得非常困惑。

这会打印到控制台:

[ { id: 1, someValue: 'a' },
  { id: 2, someValue: 'b', decoratedValue: false },
  { id: 3, someValue: 'c', decoratedValue: true } ]

编辑:

如果您需要所有内容都是异步的,那么您可以使用 scan() 在装饰器到达时收集它们:

const mainValueStream = Rx.Observable.from(mainValues)
  .concatMap(val => Observable.of(val).delay(Math.random() * 1000));

const decoratorStream = Rx.Observable.from(decoratorValues)
  .concatMap(val => Observable.of(val).delay(Math.random() * 1000))
  .scan((acc, val) => {
    acc.push(val);
    return acc;
  }, [])
  .share();


mainValueStream
  .mergeMap(main => decoratorStream
    .mergeAll()
    .filter(d => d.id === main.id)
    .defaultIfEmpty(null)
    .map(d => {
      if (d) {
        return Object.assign(main, d);
      } else {
        return main;
      }
    })
    .take(1)
  )
  .subscribe(val => console.log(val));

这会在结果到达时按随机顺序打印结果:

{ id: 2, someValue: 'b', decoratedValue: false }
{ id: 1, someValue: 'a' }
{ id: 3, someValue: 'c', decoratedValue: true }

核心是在 decoratorStream 到达时收集它们。然后,在 mainValueStream 中的每个项目上,我创建另一个 Observable,它在收到其匹配的装饰项目之前不会发出。

这样做的缺点是 scan() 中的累加器仍在增长,并且没有简单的方法可以释放已使用的项目。

关于javascript - RxJs:从 ID 标识的其他流更新列表中的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42349778/

相关文章:

javascript - react native : How to define a javascript class

javascript - 检查数字是否在 2 个值之间

JavaScript 闭包和作用域链

javascript - RxJs:如何只维护最新值直到内部可观察完成

javascript - 如何在 RxJs 中实现类似 "bufferWhile"的内容?

javascript - 在redux-observable中异步添加epic到中间件

Javascript : execute a script after location. href

javascript - 一旦 Rx.Observable 中发生错误,ValueChanges 就会停止工作

javascript - 如何取消订阅 RxJS 5 可观察对象?

rxjs - 你如何从 RXJS Observable retryWhen 抛出错误