我正在使用rxjs
.
我有一个Browser
这导致了许多Page
对象。每个页面都有一个Observable<Event>
这会产生事件流。
Page
对象在不同的时间关闭和打开。我想创建一个可观察的,名为 TheOneObservable
这将合并所有当前事件的 Page
中的所有事件对象,并且还合并来自 Browser
的自定义事件对象本身。
关闭Page
意味着应该关闭对它的订阅,这样就不会阻止它被 GC 处理。
我的问题是Pages
可以随时关闭,这意味着被合并的 Observable 数量始终在变化。我想过使用 Pages
的 Observable并使用 mergeMap
,但是这样做有问题。例如,订阅者只会收到订阅后打开的Pages的事件。
请注意,此问题已得到解答 here对于 .NET
,但使用 ObservableCollection
rxjs
中不可用.
下面是一些代码来说明问题:
class Page {
private _events = new Subject<Event>();
get events(): Observable<Event> {
return this._events.asObservable();
}
}
class Browser {
pages = [] as Page[];
private _ownEvents = new Subject<Event>();
addPage(page : Page) {
this.pages.push(page);
}
removePage(page : Page) {
let ixPage = this.pages.indexOf(page);
if (ixPage < 0) return;
this.pages.splice(ixPage, 1);
}
get oneObservable() {
//this won't work for aforementioned reasons
return Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents);
}
}
它是用 TypeScript 编写的,但应该是可以理解的。
最佳答案
您可以在链接到数组更改的 Subject()
上使用 switchMap()
,在数组更改时将 oneObservable 替换为新的 Observable。
pagesChanged = new Rx.Subject();
addPage(page : Page) {
this.pages.push(page);
this.pagesChanged.next();
}
removePage(page : Page) {
let ixPage = this.pages.indexOf(page);
if (ixPage < 0) return;
this.pages.splice(ixPage, 1);
this.pagesChanged.next();
}
get oneObservable() {
return pagesChanged
.switchMap(changeEvent =>
Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents)
)
}
测试,
const page1 = { events: Rx.Observable.of('page1Event') }
const page2 = { events: Rx.Observable.of('page2Event') }
let pages = [];
const pagesChanged = new Rx.Subject();
const addPage = (page) => {
pages.push(page);
pagesChanged.next();
}
const removePage = (page) => {
let ixPage = pages.indexOf(page);
if (ixPage < 0) return;
pages.splice(ixPage, 1);
pagesChanged.next();
}
const _ownEvents = Rx.Observable.of('ownEvent')
const oneObservable =
pagesChanged
.switchMap(pp =>
Rx.Observable.from(pages)
.mergeMap(x => x.events)
.merge(_ownEvents)
)
oneObservable.subscribe(x => console.log('subscribe', x))
console.log('adding 1')
addPage(page1)
console.log('adding 2')
addPage(page2)
console.log('removing 1')
removePage(page1)
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
关于javascript - 合并来自不断变化的 Observables 列表的事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48269819/