javascript - 合并来自不断变化的 Observables 列表的事件

标签 javascript events rxjs reactive-programming reactivex

我正在使用rxjs .

我有一个Browser这导致了许多Page对象。每个页面都有一个Observable<Event>这会产生事件流。

Page对象在不同的​​时间关闭和打开。我想创建一个可观察的,名为 TheOneObservable这将合并所有当前事件的 Page 中的所有事件对象,并且还合并来自 Browser 的自定义事件对象本身。

关闭Page意味着应该关闭对它的订阅,这样就不会阻止它被 GC 处理。

我的问题是Pages可以随时关闭,这意味着被合并的 Observable 数量始终在变化。我想过使用 Pages 的 Observable并使用 mergeMap ,但是这样做有问题。例如,订阅者只会收到订阅后打开的Pages的事件。


请注意,此问题已得到解答 here对于 .NET ,但使用 ObservableCollection rxjs 中不可用.


下面是一些代码来说明问题:

class Page {
    private _events = new Subject<Event>();

    get events(): Observable<Event> {
        return this._events.asObservable();
    }
}

class Browser {
    pages = [] as Page[];
    private _ownEvents = new Subject<Event>();

    addPage(page : Page) {
        this.pages.push(page);
    }

    removePage(page : Page) {
        let ixPage = this.pages.indexOf(page);
        if (ixPage < 0) return;
        this.pages.splice(ixPage, 1);
    }

    get oneObservable() {
        //this won't work for aforementioned reasons
        return Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents);
    }
}

它是用 TypeScript 编写的,但应该是可以理解的。

最佳答案

您可以在链接到数组更改的 Subject() 上使用 switchMap(),在数组更改时将 oneObservable 替换为新的 Observable。

pagesChanged = new Rx.Subject();

addPage(page : Page) {
  this.pages.push(page);
  this.pagesChanged.next();
}

removePage(page : Page) {
  let ixPage = this.pages.indexOf(page);
  if (ixPage < 0) return;
  this.pages.splice(ixPage, 1);
  this.pagesChanged.next();
}

get oneObservable() {
  return pagesChanged
    .switchMap(changeEvent =>
      Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents)
    )
}

测试,

const page1 = { events: Rx.Observable.of('page1Event') }
const page2 = { events: Rx.Observable.of('page2Event') }

let pages = []; 
const pagesChanged = new Rx.Subject();
const addPage = (page) => { 
  pages.push(page); 
  pagesChanged.next(); 
}
const removePage = (page) => { 
  let ixPage = pages.indexOf(page);
  if (ixPage < 0) return;
  pages.splice(ixPage, 1);
  pagesChanged.next(); 
}

const _ownEvents = Rx.Observable.of('ownEvent')

const oneObservable = 
  pagesChanged
    .switchMap(pp => 
      Rx.Observable.from(pages)
        .mergeMap(x => x.events)
        .merge(_ownEvents)
    )

oneObservable.subscribe(x => console.log('subscribe', x))

console.log('adding 1')
addPage(page1)
console.log('adding 2')
addPage(page2)
console.log('removing 1')
removePage(page1)
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

关于javascript - 合并来自不断变化的 Observables 列表的事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48269819/

相关文章:

javascript - Google 日历事件列表 jQuery 格式

javascript - dojox.enhancedGrid 插件 IndirectSelection 中的 dojo.connect

spring - ContextRefreshedEvent、ContextStartedEvent、ContextStoppedEvent 和 ContextClosedEvent 有什么区别

RxJS - toBlocking 运算符

javascript - 使用 jquery 创建覆盖但内容继承 CSS 不透明度

javascript - 如何使 div 内的 ul 标签可滚动

rxjs - 构造 rxjs/Observable 时出错

RxJS:我如何 "manually"更新 Observable?

javascript - 狡猾的JS碰撞运动

javascript - 随着时间的推移进展事件