javascript - Observer 是 RxJS 中 Observable 的 'listener' 吗?

标签 javascript rxjs observable

我正在学习 RxJS 并且对于“监听器”在哪里(在 Observable 或 Observer 中)、他们如何订阅/取消订阅以及当观察者“不再对”一个对象“不再感兴趣”时会发生什么感到很困惑可观察的,例如当您使用 taketakeUntil 时。

对于第一部分——什么是订阅者,什么是听众——我对这些陈述之间看似矛盾的事情感到困惑。来自 http://reactivex.io/rxjs/manual/overview.html我们读到 Observers 不是 Observables 的“听众”

This is drastically different to event handler APIs like addEventListener / removeEventListener. With observable.subscribe, the given Observer is not registered as a listener in the Observable. The Observable does not even maintain a list of attached Observers.

但在http://reactivex.io/learnrx/它说(练习 30)(突出显示我的)

An Observable based on an Event will never complete on its own. The take() function creates a new sequence that completes after a discrete number of items arrive. This is important, because unlike an Event, when an Observable sequence completes it unsubscribes all of its listeners. That means that if we use take() to complete our Event sequence, we don't need to unsubscribe!

这对我来说似乎是矛盾的。例如,当您使用 fromEvent 设置 Observable 时,事件监听器在哪里?例如,在基于 DOM 事件的 Observable 上使用 take(1) 时,第一个事件发送给观察者后会发生什么? Observer 是否从 Observable 中取消订阅,它继续发出事件,只是 Observer 不再收听它们了?或者 Observable 是否以某种方式取消了对 Observer 的订阅,也就是说,eventListener 在 Observable 中,而不是 Observer 中?

感谢您提供任何线索 - 显然我不是只见树木不见森林,但我正在学习的教程虽然擅长从概念上进行解释,但让我对实际发生的事情感到困惑.

最佳答案

第一部分在用词上特别讲究,以强调订阅一个可观察对象是调用一个函数(或更可能是函数链)来运行它们包含的所有代码的问题。第二部分在措辞上不太讲究,但实际上并不是在谈论同一件事。如果您愿意,第二部分可以更好地表述为“当一个可观察对象完成时,它会对其观察者调用拆卸逻辑。

当我说订阅一个可观察对象是一个调用函数链的问题时,让我试着描述一下我的意思。考虑以下 super 简单的示例:

举一个 super 简单的例子,假设我创建了这个 observable:

const justOne = Rx.Observable.create(function realSubscribe(observer) {
  observer.next(1);
  observer.complete();
});

justOne.subscribe(val => console.log(val));

如果我随后调用 justOne.subscribe(val => console.log(val)),这样做将立即调用我命名为 realSubscribe 的函数。然后执行 observer.next(1),这会导致注销 val,然后执行 observer.complete()。就是这样。

在此过程中,可观察对象没有创建或扩充订阅者列表;它只是按顺序运行代码然后完成。


现在转到一个更实际的示例,让我们考虑一下 fromEvent。如果我要实现它,它可能看起来像这样(real implementation 更复杂,但这就是它的要点):

function fromEvent(element, eventName) {
  return Rx.Observable.create(function subscribeToEvent(observer) {
    element.addEventListener(eventName, observer.next);
    return function cleanup() {
      element.removeEventListener(eventName, observer.next);
    }
  });
}

const observable = fromEvent(document, 'click');
const subscription = observable.subscribe(event => console.log(event));

现在,当我调用 observable.subscribe 时,它​​会运行 subscribeToEvent,并在这样做时调用文档上的 addEventListener。 document.addEventListener 确实导致文档保留事件监听器列表,但这是因为 addEventListener 的实现方式,而不是所有可观察对象共有的东西。可观察对象本身不跟踪任何听众。它只是调用它被告知要调用的内容,然后返回一个清理函数。


接下来让我们看看 take。和以前一样 real implementation更复杂,但大致是这样的:

// In the real `take`, you don't need to pass in another observable since that's
// available automatically from the context you called it in. But my sample code
// has to get it somehow.
function take(count, otherObservable) {
  return new Observable(function subscribeToTake(observer) {
    let soFar = 0;
    otherObservable.subscribe((value) => {
      observer.next(value);
      soFar++;
      if (soFar >= count) {
        observer.complete();
      }
    });
  });
}

const clickObservable = fromEvent(document, 'click');
take(1, clickObservable).subscribe(event => console.log(event))

如评论中所述,我使用的语法与它在 rxjs 中的使用方式并不完全匹配,但那是因为模仿它需要更完整的实现。无论如何,引起您注意的主要事情是我们开始制作一系列函数:

当我调用 .subscribe 时,它会调用 subscribeToTake。这样就设置了一个计数器,然后调用otherObservable.subscribe,也就是subscribeToEvent。 subscribeToEvent 然后调用 document.addEventListener。

Take 的工作是处于这个函数链的中间。它跟踪到目前为止已经发出了多少值。如果计数足够低,它只会转发这些值。但是一旦达到计数,它就会调用 complete,从而结束 observable。调用 complete 会导致 observable 运行它拥有的任何拆卸逻辑,或者它的链拥有的任何东西。 take 没有拆解逻辑,但 fromEvent 将运行一些拆解逻辑来删除事件监听器。

关于javascript - Observer 是 RxJS 中 Observable 的 'listener' 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57858190/

相关文章:

javascript - Vuejs 语法错误 : Unexpected identifier

Javascript,分数被附加而不是添加到分数字段?

rxjs 轮询定时器数据并在手动刷新时重置定时器

java - 如何用 lambda 替换 Observable 匿名内部类(包括记录器)以进行多个 Couchbase 访问?

javascript - 为什么这个函数不返回 JSON 字符串?

javascript - 谷歌地图 API 缩放条不显示

rxjs - 我是否必须取消订阅 rxjs 中已完成的可观察对象?

angular - 如何正确使用 Observable 和 Angular 异步管道?

angular - 如何以 Angular 取消/关闭行为主题

angular - typescript : error TS2740: Type '{}' is missing the following properties from type 'Product[]' : length, 弹出、推送、连接等 26 个