RxJS:如何动态合并可观察对象

标签 rxjs

我想将2个或更多可观察物动态地组合为一个组合可观察物。

我了解了如何结合merge已经存在的两个可观察对象,但是当需要动态添加其他可观察对象时(例如在超时后),如何解决“合并”问题?

此外,合并另一个可观察的“运行中”消息时,不应丢失combinedStream$上的现有订阅。

这是我到目前为止的内容:

const action1$ = interval(1000).pipe(map(data => 'Action1 value:' + data));
const action2$ = interval(1000).pipe(map(data => 'Action2 value:' + data));

const combinedStream$ = merge(action1$, action2$);
combinedStream$.subscribe(data => console.log('Combined Stream Output:', data));

// Add another observable after some time...
setTimeout(() => {
  const action3$ = interval(1000).pipe(map(data => 'Action3 value:' + data));
  // How add this action3$ to the combined stream ?
}, 1000);

这是我的 Blitz :https://stackblitz.com/edit/rxjs-s2cyzj

最佳答案

在处理该用例时,最简单的事情是要有一个可观察的...可观察的,然后使用更高阶的函数,例如concatAllswitchmergeAll ...

const action1$: Observable<string> = interval(2000).pipe(
  map(data => "Action1 value:" + data)
);
const action2$: Observable<string> = interval(2000).pipe(
  map(data => "Action2 value:" + data)
);
const action3$: Observable<string> = interval(2000).pipe(
  map(data => "Action3 value:" + data)
);

const mainStream$$: Subject<Observable<string>> = new Subject();

const combinedStream$ = mainStream$$.pipe(mergeAll());

combinedStream$.subscribe(data => console.log("Combined Stream Output:", data));

mainStream$$.next(action1$);
mainStream$$.next(action2$);
// Add another stream after some time...
setTimeout(() => {
  mainStream$$.next(action3$);
}, 1000);

演示:https://stackblitz.com/edit/rxjs-stwvtc?file=index.ts

关于RxJS:如何动态合并可观察对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59922545/

相关文章:

javascript - Redux 可观察到的 : dispatch action on multiple clicks (two or more)

angular - 如何直接在 ngxs 选择器上使用 rxjs 管道

angular - 使用 angular 和 rxjs 实现轮询

javascript - TypeScript 函数返回多个选项

javascript - 在带有十字符号的文本框中搜索结果

angular - 在 rxjs/Observables 范围内,变量名上的尾随美元符号是什么意思?

angular - Plunker 在更新到 Angular 6 和 rxjs 6 后损坏

javascript - 通过可观察对象映射并返回嵌套可观察对象的值

javascript - Angular 5 服务正在从 api 返回数据,但组件没有向我显示数据

node.js - 使用 rxjs ajax() 我得到 "CORS is not supported by your browser"