rxjs - 使用自定义功能扩展 Observable

标签 rxjs rxjs5

我的情况:我有一个数据存储,可以在其中获取可以使用 RxJS Observables 观察的模型。一般类型签名是

const foo = new Model({id: 123}, dataStore);
foo.asObservable().subscribe((v) => /* do stuff with values of this model */ );

一般的概念是,像对待可观察量一样处理数据有两种帮助:(a) 当你遇到冷缓存情况时(比如浏览器的 localStorage 有一些过时的数据)但仍然想显示一些东西正在获取实际数据,并且 (b) 当您已经加载了正确的数据,但在本地或在需要向上传播的后端对其进行了更改。到目前为止,这效果相当好。

下一个级别的问题是模型与其他模型之间存在关系(例如, parent 有很多 child )。像往常一样,我可以做类似的事情

foo.asObservable().subscribe(v => console.log(v.relationships.children))

并且(忽略此处的空错误),我最初会得到 [1, 2, 3] ,然后当 4 添加到父子关系中时得到 [1, 2, 3, 4] 。到目前为止和我在一起吗?

问题是我经常想要访问这些子项,不是作为索引,而是作为它们本身的可观察模型(这样我就可以显示用户社区所有成员的名字,例如,社区和所有成员都是数据模型)。我目前正在 Controller 代码中使用大量样板来执行此操作,其中涉及对 .combineLatest 的大量调用。

我想要做的是为这种类型的对象定义一个自定义运算符,这样我就可以将其全部样板化。理想情况下它看起来像:

foo.asObservable().inflateRelationship('members').subscribe(
  (v) => // v === [{name: 'steve'}, {name: 'gertude'} ...] etc
);

我实际上已经部分工作了,但问题是启动实际的链。我正在关注instructions for extending Observable ,创建一个实现 lift 的新 CustomObservable 类,但我的问题是我无法在此处使用静态 Observable 方法(例如 Observable.merge())来在 Model 中生成初始 observable。 asObservable.

我的问题来了:

const preload$ = Observable... 
  // create the "load from cache and backend observable"
const update$ = Observable ... 
  // create the "update after load when the storage updates observable" 

return new CustomObservable(context).merge(preload$, update$);

最后一行失败了。我想返回由两个常规可观察流上的合并运算符制成的 CustomObservable。我需要将上下文添加到那里的构造函数,因为该上下文包含对实际膨胀子模型所需的数据存储的引用(没有它, id 数组流就没有意义)。

这就是我的具体问题:我创建了一个 Observable 运算符,我想将其作为类添加到 CustomObservable 中,因此我可以像平常一样使用下游运算符,但我似乎无法正确获取整个链开始了。

任何指针,甚至是正确(且非平凡)扩展 Observable 类的现有项目,都将受到欢迎。我尝试深入研究源代码,但我什至无法弄清楚该部分(看起来 Observable 类静态被添加到其他地方,乍一看非常不清楚,因为 Observable class itself 中没有定义任何内容) .

最佳答案

这是“写一个很长的问题来堆栈溢出并在发布后不久找出答案”的情况之一,但我想我会为了后代而写下答案。

假设您已按照说明对 Operator 进行子类化,您要做的就是

asObservable() {
  // do a bunch of stuff making different things
  return Observable.merge(one$, two$)
    .let(obs => new CustomObservable(context, obs);

然后在 CustomObservable 中你有

class CustomObservable extends Observable {
  constructor(context, source) {
    super();
    this.source = source;
  }
  customOperator() {}
  lift(operator) {
    const obs = new CustomObservable(context, this);
    obs.operator = operator;
    return obs;
  }
}

这让我可以做到

Model.asObservable()
.filter() // normal RxJs operator here
.customOperator() // yay
.map() // back to other RxJs operators
.subscribe(v => console.log(v)) // or whatever

所以,是的。现在我的角度模型看起来更加空闲。

关于rxjs - 使用自定义功能扩展 Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44599304/

相关文章:

javascript - 需要 RxJS 说明 : how to return a plain object instead of an Observable

javascript - 如何对我的 Observable 的 catch 主体进行单元测试

angular - 使用间隔时如何立即启动 Angular 2 Observable

angular - 文本行连续闪烁/闪烁

unit-testing - RxJS Redux-Observables 测试 retryWhen inside an epic

angular - 类型 'concatMap' 上不存在属性 'Observable<Object>'?

angular - "flatten"数组在 RxJS Observable 中的最佳方式

angular - 在发出两个可观察对象的第一个值后,Zip 不发出值

javascript - 组合最新运算符替代方案

node.js - RxJS 与 rx-node