rxjs - 如何知道 switchMap 是否取消订阅内部 Observable?

标签 rxjs observable rxjs6 reactivex switchmap

由于如果有新项目进入,switchMap 可能会切断(取消订阅)内部 Observable,因此我们期望出现以下情况:

  const items = from([1, 2])
  const seen = []
  const derived = items.pipe(
    switchMap(item =>
      concat(
        of(`now: ${item}`),
        of(`end: ${item}`).pipe(delay(10))
      )
    )
  )

  derived.subscribe(next => seen.push(next))

导致seen具有:

['now: 1', 'now: 2', 'end: 2']
           ^ unsubscribed to 1 here, as planned!

问题是,有没有办法收到第 1 项已取消的通知?原因是,如果取消发生得太频繁,我想知道。

我尝试过修改内部 Observables 的方法,但基本上我尝试在它们“最后”做的任何事情在取消订阅时都不会发生。观察者规范有 nextcompletederror,但没有“提前结束”的内容,所以我不确定要尝试什么。

我希望能够向 switchMap 传递一个附加函数,当取消内部可观察值并提供参数时,该运算符将调用该函数。

  const derived = items.pipe(
    switchMap(item => concat(
      of(`now: ${item}`),
      of(`end: ${item}`).pipe(delay(10))
    ),
    (item) => console.error(`${item} was cut off.`))
  )

最佳答案

编辑:RxJS GitHub 页面上有一个问题讨论 finalize 运算符的此功能。 https://github.com/ReactiveX/rxjs/issues/2823

由于 switchMap 取消订阅内部 Observable,因此它不会收到任何完整的通知。

相反,您可以只添加一个处置处理程序或简单地说使用 finalize() 运算符来为您完成此操作,并在处置链时调用它。这适用于完成事件和取消订阅事件。

switchMap(item =>
  concat(
    of(`now: ${item}`),
    of(`end: ${item}`).pipe(delay(10))
  ).pipe(
    finalize(() => /* switchMap unsubscribed or concat completed */),
  )
)

请注意,当您取消订阅时,这意味着您不想再从 Observable 接收任何项目。这意味着取消订阅时您无法从已取消订阅的 Observable 接收任何新项目。例如,有一个 defaultIfEmpty 运算符,它会在源完成时发出一个项目,而不发出任何适用于完整通知的内容,但在您取消订阅时则无法使用。

关于rxjs - 如何知道 switchMap 是否取消订阅内部 Observable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54149656/

相关文章:

java - RxJava : How to run two sequential calls : second depends on first

javascript - 了解 rxjs 中的 SwitchMap

javascript - Observable 订阅者在哪里执行

angular - 如何为对象创建行为主题并在另一个组件上订阅它?

javascript - 操作 RxJS 流并发布结果的 Observable 的正确方法是什么?

ajax - Angular 2/4 & RxJS - forEach 内的订阅 - 确保流量控制在所有 Observable 完成之前不会继续

angular - 将数据从一个组件传递给服务,然后再传递给另一个组件

javascript - 可观察数组的 knockout 过滤

javascript - 更好的 Observable typescript 模式 if x of Observable<X> else y of Observable<Y>

error-handling - 如何在可观察 map (rxjs6,ng6)中抛出错误