javascript - 缓冲 rx.js observable 的多个订阅者

标签 javascript buffer rxjs observable subject

我有

  var subject = new rx.Subject();
  var stream =     rx.Observable.fromEvent(blah, 'event')
                  .filter(blah)
                  .map(blah)
                  .subscribe(subject);

                return subject;

然后我将主题传递给几个不同的处理程序,这些处理程序将以不同的方式和不同的速度处理事件。
所以我在每个处理程序中拥有的是

subject.subscribe(async function (x) {
        const func = self[x.eventName];
        if (func) {
          await eventHandlerWrapper(self.handlerName, func, x);
        }
      })

我有两个问题, a)如果事件来得 super 快,处理程序是否会按照我的方式同步并以正确的顺序处理它们? b) 如果不同的处理程序以不同的速度处理事件,它们是否都会等到最慢的处理程序完成后再提供下一个事件?或者他们会按照自己的节奏进行缓冲和处理吗?

谢谢你,

最佳答案

首先,主题的创建可以这样简化:

const subject = rx.Observable.fromEvent(blah, 'event')
              .filter(blah)
              .map(blah)
              .share();

share 方法将从流中创建一个主题。如果您将此主题实例返回给每个订阅者,您将获得相同的行为并且看起来更好。

 a) if the events come in super fast is the handler going to process
 them synchronously and in the right order given the way I have it?

事件将按照正确的顺序一一推送到整个链中。这意味着,在处理下一个值之前,通过“fromEvent”传入的事件将被推送到整个链,直到您订阅它为止(除非之间有异步运算符:))。 Ben Lesh 在 2015 年 Angular Connect 上解释了这一点:https://www.youtube.com/watch?v=KOOT7BArVHQ (你可以观看整个演讲,但在第 17 分钟左右,他将数组与可观察量进行了比较)。

b) if the different handlers handle the event at different speeds are 
they all going to wait till the slowest handler is through before the    
next event is provided? or will they all sort of buffer and handle at  
they're own pace?

他们将按照自己的节奏处理事件。检查以下示例:

let interval$ = Rx.Observable.interval(1000).share();

interval$.concatMap((val) => {
    console.log('called');
    return Rx.Observable.of(val).delay(3000)
  })
  .subscribe((val) => console.log("slow ", val));

interval$.subscribe((val) => console.log("fast ", val));

这里我使用了一个区间可观察量,并将其转换为一个主题。所以它每秒都会发出一个事件。我有一个订阅正在获取一个值,处理该值(需要 2 秒),然后获取下一个值(使用 concatMap)。另一个订阅可以立即处理它们。如果您运行此代码(此处为 jsbin: https://jsbin.com/zekalab/edit?js,console ),您将看到它们都按照自己的节奏处理事件。

因此它们不会等待最慢的处理程序,它将在内部缓冲。

如果最慢的处理器慢于事件抛出的频率,您所描述的情况可能会出现潜在的危险情况。在这种情况下,您的缓冲区将不断增长,最终您的应用程序将崩溃。这是一个叫做背压的概念。您获取事件的速度比处理事件的速度快。在这种情况下,您需要在最慢的处理器上使用“缓冲区”或“窗口”等运算符来避免这种情况。

关于javascript - 缓冲 rx.js observable 的多个订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40538960/

相关文章:

javascript - Angular 4显示当前时间

javascript - Backbone.js 空数组属性

javascript - 为什么state在foreach中更新时不将Array state中的所有数据保留在hook中?

memory - Node.js 内存泄漏

javascript - RxJS 可观察到的.concat : How to know where next result came from?

Angular 6 Async-await 无法处理 http 请求

javascript - 如何将输入字段值视为数字?

java - 如何使用javascript调用action

c - C中的缓冲区操作

c++ - 如何保存两台摄像机的数据而不影响它们的图像采集速度?