我是响应式(Reactive)编程的新手,好奇是否有更优雅的方式来实现去抖动分 block 异步队列。您问的去抖动分 block 异步队列是什么?好吧,也许有更好的名字,但想法是异步函数可能会在一段时间内被调用多次。我们想首先对函数的调用进行去抖动,并将所有这些参数打包成一个参数。其次,该异步函数的所有执行都应以 FIFO 方式序列化。
这是我在没有流、可观察对象或响应式(Reactive)编程的情况下实现的,正如我所看到的:
const debouncedChunkedQueue = <T>(
fn: (items: T[]) => Promise<void> | void,
delay = 1000
) => {
let items: T[] = [];
let started = false;
const start = async () => {
started = true;
while (items.length) {
await sleep(delay);
const chunk = items.splice(0, items.length);
await fn(chunk);
}
started = false;
};
const push = (item: T) => {
items.push(item);
if (!started) start();
};
return { push };
};
https://codesandbox.io/s/priceless-sanne-dkrkw?file=/src/index.ts:87-550
最佳答案
我 1:1 遵循了您当前的实现,这导致代码比您使用简单的 bufferTime
实现的代码更复杂。
您可以在RxViz中测试以下代码.
const { BehaviorSubject, fromEvent, of } = Rx;
const { buffer, delay, first, tap, mergeMap, exhaustMap } = RxOperators;
// Source of the queue - click to emit event
const items = fromEvent(document, 'click');
// a "pushback" source
const processing = new BehaviorSubject(false);
items.pipe(
// we resubscribe to items to start only when an item is there
buffer(
// we take items again so we only
// start if there are any items
// to be processed and then we wait
// 1 second
items.pipe(
// we do not start processing until
// pending async call completes
exhaustMap(() =>
processing.pipe(
// wait for processing end
first(val => !val),
// wait for 1 second
delay(1000),
),
),
),
),
tap(() => processing.next(true)),
// basic mergeMap is okay, since we control that the next value
// will come no sooner than this is completed
mergeMap(items =>
// async fn simulation
of(items.length).pipe(delay(300 + Math.random() * 500)),
),
tap(() => processing.next(false)),
);
不过,我们必须将状态保持在 observable 之外,这真是令人失望。
一个 debouncedChunkedQueue
您可以在stackblitz中测试以下代码
import { BehaviorSubject, Subject, of } from "rxjs";
import {
buffer,
delay,
first,
tap,
mergeMap,
exhaustMap
} from "rxjs/operators";
const debouncedChunkedQueue = <T>(
fn: (items: T[]) => Promise<void> | void,
delayMs = 1000
) => {
// Source of the queue - click to emit event
const items = new Subject<T>();
// a "pushback" source
const processing = new BehaviorSubject(false);
items
.pipe(
// we resubscribe to items to start only when an item is there
buffer(
// we take items again so we only
// start if there are any items
// to be processed and then we wait
// 1 second
items.pipe(
// we do not start processing until
// pending async call completes
exhaustMap(() =>
processing.pipe(
// wait for processing end
first(val => !val),
// wait for 1 second
delay(delayMs)
)
)
)
),
tap(() => processing.next(true)),
// basic mergeMap is okay, since we control that the next value
// will come no sooner than this is completed
mergeMap(
items =>
// TODO: Make sure to catch errors from the fn if you want the queue to recover
// async fn simulation
fn(items) || of(null)
),
tap(() => processing.next(false))
)
.subscribe();
return { push: (item: T) => items.next(item) };
};
关于javascript - 使用流实现去抖分块异步队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65407362/