angular - 如何连接嵌套在另一个 Observable 中的多个 Observable 集合

标签 angular merge rxjs observable concatenation

我是 RxJs 的新手,我在 Angular 应用程序的复杂场景中苦苦挣扎,我在其他可观察对象中嵌套了可观察对象(下文会简化)。

简而言之,我有:

  • 一个 Observable,它返回一个包含对象集合的流 SearchActivity => Observable<SearchActivity[]>
  • 每个 SearchActivity 对象都有一个 Observable 属性 'results$' => BehaviorSubject<SearchActivityResult[]>

我尝试将所有事件中的所有结果提取到一个 Observable 中。 所以,从下面的例子(这不是代码,而是一种数据结构的思想):

Observable<SearchActivity[]> => [
 {id: 1, results$: Observable<SearchActivityResult[]> => [1, 2]},
 {id: 2, results$: Observable<SearchActivityResult[]> => [3, 4, 5]} 
 {id: 3, results$: Observable<SearchActivityResult[]> => [6]} 
]

我想提取结果得到:

Observable<SearchActivityResult[]> => [1, 2, 3, 4, 5, 6] (Edit: the order is not important)

如何使用 RxJs 实现这一点?

到目前为止,我得到了类似的东西:

// This is not working properly
this.allResults$ = this.displayedActivities$.pipe(
  mergeMap(activities => {
    return concat(...activities?.map(activity => activity.results$ || of([])));
  })
);

但它似乎没有按预期工作。

我准备了一个StackBiltz with a more complex scenario here

更新 2021-02-23

我创建了一个新的 StackBlitz您可以在其中通过单击与结果进行交互。目标是获得每个数据库的所有结果的摘要(页面底部的表格),并显示所有搜索条件的正面/负面结果的数量。

TL;DR => 解决方案

我刚刚创建了一个StackBlitz以较低的复杂性来隔离问题。我已经包含了 Mrk Sef 提出的建议它正在工作。 我还有其他一些关于 flat 运算符的错误和对 json 的奇怪循环引用,但这更可能是由 StackBlitz 模拟器引起的。

最佳答案

我认为您的想法是正确的,但您需要最后一步才能将流减少为您的值(value)。

你写了什么:

this.allResults$ = this.displayedActivities$.pipe(
  mergeMap(activities => 
    concat(...activities?.map(
      activity => activity?.results$ || of([])
    ))
  )
);

activities 中的每个条目都有一个发射,但您不想要一系列发射,您想要一个数组来合并/展平结果。

你可以尝试这样的事情:

this.allResults$ = this.displayedActivities$.pipe(
  mergeMap(activities => 
    merge(...activities?.map(
      activity => activity?.results$ || of([])
    ))
  ),
  toArray(),
  map(res => res.flat())
);

this.allResults$ = this.displayedActivities$.pipe(
  mergeMap(activities => 
    merge(...activities?.map(
      activity => activity?.results$ || of([])
    ))
  ),
  reduce((acc, curr) => [...acc,...curr], [])
);

更新:

您可以运行这个最小示例:

const displayedActivities$ = of([
  {results$: of([1,2,3])},
  {results$: of([10,20,30])},
  {something: "else"},
  {results$: of([100,200,300])}
]);

const allResults$ = displayedActivities$.pipe(
  mergeMap(activities => 
    merge(...activities.filter(
      activity => activity?.results$ != null
    ).map(
      activity => activity.results$
    ))
  ),
  reduce((acc, curr) => [...acc,...curr], [])
).subscribe(console.log);

输出:

[1, 2, 3, 10, 20, 30, 100, 200, 300]

更新#2

这就是您可能如何处理将属性 result$ observables 更改为 long-lived observables。和上面的toArray()Array.flat()差不多,只是现在用的是combineLatest

这是应该运行的代码。它将保留每个可观察到的最新发射作为最终组合输出的一部分:

模拟设置

创建发射对象数组的 displayedActivities$,每个对象都有一个属性 result$,在完成之前发射一定次数(在本例中为 3 次)。

/****
 * Pipeable Operator:
 * Takes arrays emitted by the source and spaces out their
 * values by the given interval time in milliseconds
 ****/
function intervalArray<T>(intervalTime = 1000): OperatorFunction<T[], T> {
  return pipe(
    concatMap((v: T[]) =>
      concat(
        ...v.map((value: T) =>
          EMPTY.pipe(
            delay(intervalTime),
            startWith(value)
          )
        )
      )
    )
  );
}

const displayedActivities$ = of([
  { results$: 
    of([
      [1,2,3],
      [4,5,6],
      [7,8,9]
    ]).pipe(intervalArray(250))
  },
  { results$: 
    of([
      [10,20,30],
      [40,50,60],
      [70,80,90]
    ]).pipe(intervalArray(300))
  },
  { something: "else" },
  { results$: 
    of([
      [100,200,300],
      [400,500,600],
      [700,800,900]
    ]).pipe(intervalArray(350))
  }
]);

合并所有结果$

const allResults$ = displayedActivities$.pipe(
  mergeMap(activities => 
    combineLatest(...activities.filter(
      activity => activity?.results$ != null
    ).map(
      activity => activity.results$.pipe(startWith([]))
    ))
  ),
  map(latest => latest.flat())
).subscribe(console.log);

输出:

[1,2,3]
[1,2,3,10,20,30]
[1,2,3,10,20,30,100,200,300]
[4,5,6,10,20,30,100,200,300]
[4,5,6,40,50,60,100,200,300]
[4,5,6,40,50,60,400,500,600]
[7,8,9,40,50,60,400,500,600]
[7,8,9,70,80,90,400,500,600]
[7,8,9,70,80,90,700,800,900]

关于angular - 如何连接嵌套在另一个 Observable 中的多个 Observable 集合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66317277/

相关文章:

angular - 使用 FileSaver 和 blob 将响应正文另存为文件

version-control - 当使用 Borland StarTeam 更改文件夹结构时,如何合并我的文件?

python - 在给定阈值内合并范围(间隔)的有效方法

Angular 7 属性 'share' 在可观察类型中不存在

Angular Firestore 通过查询获取单个文档

javascript - 将 Typescript 2.3 模块发布到 NPM 以供 Angular 4 使用

angular - 如何在 angular 2 中的 formcontrolname 上应用管道

Matlab - 合并两个向量和一个不同维度的矩阵

javascript - 组合 RXJS 可观察量,但等待第一个开始发出值

angular - RXJS Observable 将数组转换为多个值