javascript - 使用流实现去抖分块异步队列

标签 javascript typescript rxjs reactive-programming

我是响应式(Reactive)编程的新手,好奇是否有更优雅的方式来实现去抖动分 block 异步队列。您问的去抖动分 block 异步队列是什么?好吧,也许有更好的名字,但想法是异步函数可能会在一段时间内被调用多次。我们想首先对函数的调用进行去抖动,并将所有这些参数打包成一个参数。其次,该异步函数的所有执行都应以 FIFO 方式序列化。

这是我在没有流、可观察对象或响应式(Reactive)编程的情况下实现的,正如我所看到的:

const debouncedChunkedQueue = <T>(
  fn: (items: T[]) => Promise<void> | void,
  delay = 1000
) => {
  let items: T[] = [];
  let started = false;
  const start = async () => {
    started = true;
    while (items.length) {
      await sleep(delay);
      const chunk = items.splice(0, items.length);
      await fn(chunk);
    }
    started = false;
  };
  const push = (item: T) => {
    items.push(item);
    if (!started) start();
  };
  return { push };
};

https://codesandbox.io/s/priceless-sanne-dkrkw?file=/src/index.ts:87-550

最佳答案

我 1:1 遵循了您当前的实现,这导致代码比您使用简单的 bufferTime 实现的代码更复杂。

您可以在RxViz中测试以下代码.

const { BehaviorSubject, fromEvent, of } = Rx;
const { buffer, delay, first, tap, mergeMap, exhaustMap } = RxOperators;

// Source of the queue - click to emit event
const items = fromEvent(document, 'click');

// a "pushback" source
const processing = new BehaviorSubject(false);

items.pipe(
  // we resubscribe to items to start only when an item is there
  buffer(
    // we take items again so we only
    // start if there are any items
    // to be processed and then we wait
    // 1 second
    items.pipe(
      // we do not start processing until
      // pending async call completes
      exhaustMap(() =>
        processing.pipe(
          // wait for processing end
          first(val => !val),
          // wait for 1 second
          delay(1000),
        ),
      ),
    ),
  ),
  tap(() => processing.next(true)),
  // basic mergeMap is okay, since we control that the next value
  // will come no sooner than this is completed
  mergeMap(items =>
    // async fn simulation
    of(items.length).pipe(delay(300 + Math.random() * 500)),
  ),
  tap(() => processing.next(false)),
);

不过,我们必须将状态保持在 observable 之外,这真是令人失望。

一个 debouncedChunkedQueue

您可以在stackblitz中测试以下代码

import { BehaviorSubject, Subject, of } from "rxjs";
import {
  buffer,
  delay,
  first,
  tap,
  mergeMap,
  exhaustMap
} from "rxjs/operators";

const debouncedChunkedQueue = <T>(
  fn: (items: T[]) => Promise<void> | void,
  delayMs = 1000
) => {
  // Source of the queue - click to emit event
  const items = new Subject<T>();

  // a "pushback" source
  const processing = new BehaviorSubject(false);

  items
    .pipe(
      // we resubscribe to items to start only when an item is there
      buffer(
        // we take items again so we only
        // start if there are any items
        // to be processed and then we wait
        // 1 second
        items.pipe(
          // we do not start processing until
          // pending async call completes
          exhaustMap(() =>
            processing.pipe(
              // wait for processing end
              first(val => !val),
              // wait for 1 second
              delay(delayMs)
            )
          )
        )
      ),
      tap(() => processing.next(true)),
      // basic mergeMap is okay, since we control that the next value
      // will come no sooner than this is completed
      mergeMap(
        items =>
          // TODO: Make sure to catch errors from the fn if you want the queue to recover
          // async fn simulation
          fn(items) || of(null)
      ),
      tap(() => processing.next(false))
    )
    .subscribe();
  return { push: (item: T) => items.next(item) };
};

关于javascript - 使用流实现去抖分块异步队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65407362/

相关文章:

javascript - 使用模式表单时,AngularJS 数据表未更新

javascript - 如何重置 Highcharts 中系列的样式?

reactjs - 使用 div React useRef 钩子(Hook), typescript ?

typescript - 如何通过在 Typescript 中使用模式操作属性来基于现有类型生成新类型

rx-java - Scala.js Observables 查询

javascript - 如何避免第一个未定义的订阅 JavaScript RxJS

Javascript Date() 时区不一致

Javascript 排序数字

typescript - 从基类静态方法返回子类的新实例

javascript - React Redux with redux-observable 在异步操作完成后使用路由器导航到不同的页面