javascript - RxJS Operator 延迟处理后续事件

标签 javascript angular typescript rxjs observable

我正在寻找一个 RxJS 运算符(或运算符的组合),它可以让我实现这一点:

  • 由 observable 触发的每个事件都应该按照它被触发的顺序来处理(没有跳过、限制、去抖动任何值)。
  • 我应该能够在处理 2 个后续事件之间定义一个设定的时间间隔。
  • 如果当前没有处理先前的事件,则应立即处理下一个事件(无延迟)。

  • 我创建了 following example我希望这会使这一点更清楚:
    import { Component, OnInit } from "@angular/core";
    import { Subject } from "rxjs";
    import { delay } from "rxjs/operators";
    
    @Component({
      selector: "app-root",
      templateUrl: "./app.component.html",
      styleUrls: ["./app.component.css"]
    })
    export class AppComponent implements OnInit {
      counter = 1;
      displayedValue = 1;
    
      valueEmitter = new Subject<number>();
      valueEmitted$ = this.valueEmitter.asObservable();
    
      ngOnInit() {
        // I want to allow each incremented value to be displayed for at least 2 seconds
        this.valueEmitted$.pipe(delay(2000)).subscribe((value) => {
          this.displayedValue = value;
        });
      }
    
      onClick() {
        const nextCounter = ++this.counter;
        this.counter = nextCounter;
        this.valueEmitter.next(nextCounter);
      }
    }
    
    在本例中,我希望 counter 中的每个值都增加。显示为 displayedValue至少 2 秒。
    所以用户第一次点击onClick显示的值应该立即从 1 变为 2,但如果用户继续快速增加 counter displyedValue应该以延迟的方式跟随,允许每个先前增加的数字在更改之前显示 2 秒,慢慢 catch 当前的计数器值。
    这可以通过 RxJs 运营商实现吗?
    更新
    目前我想出了following solution :
    import { Component, OnInit } from "@angular/core";
    import { Subject } from "rxjs";
    
    @Component({
      selector: "app-root",
      templateUrl: "./app.component.html",
      styleUrls: ["./app.component.css"]
    })
    export class AppComponent implements OnInit {
      counter = 1;
      displayedValue = 1;
      currentDisplayedValueStartTime = 0;
      readonly DISPLAY_DURATION = 2000;
      valuesToDisplay = [];
      displayedValueSwitcherInterval: number | null = null;
    
      valueEmitter = new Subject<number>();
      valueEmitted$ = this.valueEmitter.asObservable();
    
      ngOnInit() {
        this.valueEmitted$.pipe().subscribe((value) => {
          this.handleValueDisplay(value);
        });
      }
    
      onClick() {
        const nextCounter = ++this.counter;
        this.counter = nextCounter;
        this.valueEmitter.next(nextCounter);
      }
    
      handleValueDisplay(value) {
        this.valuesToDisplay.push(value);
    
        if (!this.displayedValueSwitcherInterval) {
          this.displayedValue = this.valuesToDisplay.shift();
    
          this.displayedValueSwitcherInterval = window.setInterval(() => {
            const nextDiplayedValue = this.valuesToDisplay.shift();
            if (nextDiplayedValue) {
              this.displayedValue = nextDiplayedValue;
            } else {
              clearInterval(this.displayedValueSwitcherInterval);
              this.displayedValueSwitcherInterval = null;
            }
          }, this.DISPLAY_DURATION);
        }
      }
    }
    
    这不是我想要的,因为它依赖于组件状态和 setInterval为了管理通知队列。我希望找到一些更干净、更具声明性的东西,它依赖于 RxJs 操作符来处理我的通知流和它们的间隔队列——但至少我让这个功能完全按照我的意愿工作。仍然愿意接受有关如何使其变得更好的建议。

    最佳答案

    我认为这可能是解决它的一种方法:

    this.valueEmitted$
      .pipe(
        concatMap(
          (v) => concat(
            of(v),
            timer(2000).pipe(ignoreElements())
          )
        )
      )
      .subscribe((value) => {
        this.displayedValue = value;
      });
    
    因为timer也会发出一些值(0、1、2 等等),我们使用 ignoreElements因为我们对这些值和我们想要的 timer 不感兴趣是 完成通知这将指示可以订阅下一个内部 observable 的时刻。请注意,前面提到的内部 observable 是通过调用 concatMap 创建的。的回调函数与 v作为论据。
    Working demo.

    Every event fired by the observable should be handled by the order it was triggered (no skipping, throttling, debouncing any of the values).


    这在 concatMap 的帮助下得到保证.

    I should be able to define a set interval between the handling of 2 subsequent events.


    我们可以通过以下方式实现:
    concat(
      // First, we send the value.
      of(v),
      // Then we set up a timer so that in case a new notification is emitted, it will have to
      // wait for the timer to end.
      timer(2000).pipe(ignoreElements())
    )
    

    In case no previous events are currently handled, the next event should be handled immediately (no delay).


    如果 concatMap 没有创建事件的内部 observables , 那么值 v将立即发送给消费者。

    这也可以:
    this.valueEmitted$
      .pipe(
        concatMap(
          (v) => timer(2000).pipe(
            ignoreElements(), startWith(v)
          )
        )
      )
      .subscribe((value) => {
        this.displayedValue = value;
      });
    

    关于javascript - RxJS Operator 延迟处理后续事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68414582/

    相关文章:

    angular - 错误 TS2300 : Duplicate identifier 'export='

    angular - 如何检查 typescript + Angular 中的变量类型?

    Angular - [(ngModel)] vs [ngModel] vs (ngModel)

    javascript - 出现错误无法解构 'protocol' 的属性 'window.location',因为它在构建时在 nextjs 应用程序中未定义

    javascript - Angular CLI - 如何在整个应用程序中共享原型(prototype)函数

    javascript - div的css过渡效果

    javascript - 如何在 JavaScript Web Worker 中异步解压缩 gzip 文件?

    reactjs - 如何在 Amplify 数据存储中创建动态查询

    javascript - Gulp.js - 在文件开头注入(inject)项目详细信息

    javascript - 模板中带有脚本标签的广告 [Vue.js]