javascript - SwitchMap 和 PublishReplay 组合

标签 javascript rxjs reactive-programming

我正在开发一个 Angular 项目,但我不完全理解为什么此代码片段没有按预期工作:

const httpObs = Rx.Observable.of("")
  .do(() => console.log("trigger http call")) //only triggered once
  .publishReplay(1).refCount();

let observable = Rx.Observable.timer(0,1000)
  .do((e) => console.log("new event: " + e))
  .switchMap(() => this.httpObs);

const c1 = observable.subscribe(() => console.log("subscriber 1"));
const c2 = observable.subscribe(() => console.log("subscriber 2"));

JS Bin

我不明白为什么 Rx.Observable.timer 发出的每个事件都不会触发“触发 http 调用”。我的期望是,只要有一个订阅者订阅了可观察对象,每个发出的事件都会立即触发 http 调用。然而,每个订阅者都会收到相同的排放量,这意味着每个事件只有一个 http 调用。

我知道我可以通过在可观察对象上使用publishReplay + refcount来修复它(见下文),但这仅适用于这个简单的示例。

我真正想要实现的是两个组件使用相同的 http 调用响应,并且每个组件发出一个 onInit 事件,然后将其 switchMapped 到 http 响应。我不希望每个新订阅者都触发调用。但是,我希望在发出另一个事件时触发该调用。 onInit observable 与 click-observable 合并,每当用户单击“更新”按钮时,在 switchMap 操作符用于转换之前,click-observable 就会发出新事件。现在的问题是,更新按钮不再触发 http 调用。

此示例显示了它的行为方式:

const httpObs = Rx.Observable.of("")
  .do(() => console.log("trigger http call"));

let observable = Rx.Observable.timer(0,1000)
  .do((e) => console.log("new event: " + e))
  .switchMap(() => this.httpObs)
  .publishReplay(1);

observable.subscribe(() => console.log("subscriber 1"));
observable.subscribe(() => console.log("subscriber 2"))

observable.connect();

JS Bin

最佳答案

您遇到的问题是所有主题实例都有其内部状态,并且它们何时收到 completeerror它们将自己标记为“已停止”并且永远不会发出任何通知(这是所谓的 Observable 合约的要求)。你可以看看这篇文章https://medium.com/@martin.sikora/rxjs-subjects-and-their-internal-state-7cfdee905156我前段时间写过。它涵盖了非常相似的主题。

在您的代码中,您使用相同的 httpObs多次,但源 Observable 是 of()发出单个值,然后发送 complete通知。

看看它真的叫:http://jsbin.com/fuvuxax/2/edit?js,console

const httpObs = Rx.Observable.of("")
  .do(() => console.log("trigger call"), undefined, () => console.log('complete'))
  .publishReplay(1).refCount();

这意味着 publishReplay 内的主题收到complete通知并且不会再次订阅其源 Observable。然而,自从 publishReplay内部使用ReplaySubject它仍然会刷新其缓冲区,然后发送 complete通知。但这对您来说不是很有用,因为我认为您想要再次执行 HTTP 调用,而不仅仅是重放过时的调用。

你能做的就是转动 httpObs进入每次创建链的方法 switchMap接收一个值:

const httpObs = () => Rx.Observable.of("")
  .do(() => console.log("trigger call"), undefined, () => console.log('complete'));

let observable = Rx.Observable.timer(0,1000)
  .do((e) => console.log("new event: " + e))
  .take(3)
  .switchMap(() => this.httpObs())
  .publishReplay(1).refCount();

observable.subscribe(() => console.log("subscriber 1"));
observable.subscribe(() => console.log("subscriber 2"));

您可以看到打印 "trigger call"只有三次,如果我正确理解你的描述,我想这就是你想要的:

"trigger call"
"subscriber 1"
"subscriber 2"
"complete"
"trigger call"
"subscriber 1"
"subscriber 2"
"complete"
"trigger call"
"subscriber 1"
"subscriber 2"
"complete"

关于javascript - SwitchMap 和 PublishReplay 组合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48457721/

相关文章:

javascript - 动态更新表行

javascript - 如何翻转具有特定方向的图像

angular - 了解 Promise 与 Observables 的结果

javascript - 了解 rxjs 中的 SwitchMap

javascript - 如何获取h :inputHidden element whose value is calculated using jquery javascript的值

javascript - 合并 JS 对象而不覆盖

Angular 2 (ng2) 在 [class.whatever] 中使用异步管道

javascript - forkJoin 中的日志行未被调用

ios - RxSwift - 在 X 秒内未收到元素后发出

java - 使用 RxJava 跳过最后一个重复项