typescript - 想要缓冲一个 Observable 直到另一个 Observable 触发,然后删除缓冲区并使用一个订阅正常触发

标签 typescript rxjs reactive-programming

我正在我的 Angular 应用程序中实现分析服务。我在服务的构造函数中创建并附加第 3 方脚本。可能存在竞争条件,即其他服务在加载脚本之前尝试触发遥测事件。我想在遥测方法上创建一个缓冲区来保存消息,直到脚本加载,然后刷新缓冲区,然后按正常方式推送。

伪代码:

// Global variable set by 3rd party library after it loads
declare let analytics: any;

class AnalyticsService {
  isLoaded$ = new BehaviorSubject<boolean>(false);
  identify$ = new BehaviorSubject<user>(null);

  constructor() {
    this.loadScript();

    // Here, I want to buffer messages until the 3rd party script initializes
    // Once isLoaded$ fires, forEach the queue and pass to the 3rd party library
    // Then for every message after, send one at a time as normal like 
    // the buffer wasn't there
    this.identify$
      .pipe(buffer(this.isLoaded$.pipe(skip(1)) // Essentially want to remove this after it fires
      .subscribe((user) => analytics.identify(user));
  }

  loadScript(): void {
    const script = document.createElement('script');
    script.innerHTML = `
      // setup function from 3rd party
    `;

    document.querySelector('head').appendChild(script);

    interval(1000)
      .pipe(take(5), takeUntil(this.isLoaded$))
      .subscribe(_ => {
        if (analytics) this.isLoaded$.next(true);
      })
  }

  identify(user): void {
    this.identify$.next(user);
  }
}

如果我使用两个订阅,就会像

identify$
  .pipe(
    takeUntil(this.isLoaded$),
    buffer(this.isLoaded$),
  ).subscribe(events => events.forEach(user => analytics.identify(user)));

identify$
  .pipe(
    filter(_ => this.isLoaded$.value),
  ).subscribe(user => analytics.identify(user))

有没有办法通过一次订阅来做到这一点?

最佳答案

这可能是实现您正在寻找的目标的方法:

constructor () {
  this.loadScript();

  this.identify$
    .pipe(
      buffer(
        concat(
          this.isLoaded$.pipe(skip(1), take(1)),
          this.identify.pipe(skip(1)),
        )
      )
    )
    .subscribe((user) => analytics.identify(user));
}

要点在于

concat(
  this.isLoaded$.pipe(skip(1), take(1)),
  this.identify$.pipe(skip(1)),
)

因此,我们首先等待 isLoaded$ 发出 true,然后 this.identify.pipe(skip(1)) 将是已订阅。

加载脚本后,您希望在 this.identify$ 发出时立即继续。这就是我们从缓冲区的关闭通知程序再次订阅它的原因。基本上,this.identify$ 现在将有 2 个订阅者。第一个是 subscribe((user) => Analytics.identify(user)),第二个是来自 concat 的(即 buffer) code> 的关闭通知程序)。当 this.identify$ 发出时,该值将按顺序发送给其订阅者。因此,该值最终将被添加到缓冲区中,然后立即传递给链中的下一个订阅者,因为 this.identify$ 的第二个订阅者将同步接收该值。

关于typescript - 想要缓冲一个 Observable 直到另一个 Observable 触发,然后删除缓冲区并使用一个订阅正常触发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65365663/

相关文章:

javascript - PdfMake 要重复的项目列表(文本旁边的复选框)

javascript - 转义 Angular 模板表达式以在 Angular HTML 组件模板中显示?

javascript - 如何在 typescript 中包含原型(prototype)

angular - 返回 false 而不是 undefined

java - RxJava : how to code something like doOnEmpty?

ios - ReactiveSwift 中的 API 请求

java - 不使用 Observable.create() 时如何查看订阅状态?

javascript - 如何让 Angular2+ 路由器在 Angular2+/AngularJS 混合应用程序上工作?

javascript - 如何在 Angular 6 中取消 http 请求?

angular - 为什么组合的可观察对象在使用 Subject 时不更新模板,或者如果它们在 ngAfterContentInit 之后发出