我的 Angular 应用程序中有一个 firebase 订阅,它会触发多次。 如何实现任务作为队列处理,以便我可以同步运行每个任务一次?
this.tasks.subscribe(async tasks => {
for (const x of tasks)
await dolongtask(x); // has to be sync
await removetask(x);
});
问题是 subribe 事件在 longtask 仍在处理时触发。
最佳答案
恕我直言,我会尝试利用 rxjs 的强大功能,因为无论如何我们已经在这里使用它,并且避免按照另一个答案的建议实现自定义队列概念(尽管您当然可以这样做)。
如果我们稍微简化给定的情况,我们只有一些可观察的,并且想要为每个发射执行一个长时间运行的过程——按顺序。 rxjs 允许通过 concatMap
运算符实现这一点,本质上是开箱即用的:
$data.pipe(concatMap(item => processItem(item))).subscribe();
这仅假设 processItem
返回一个可观察对象。由于您使用了 await
,我假设您的函数当前返回 Promises。这些可以使用 from
简单地转换为 observables。
从 OP 中唯一需要注意的细节是,可观察对象实际上发出了一个项目数组,我们希望对每个发射的每个项目执行操作。为此,我们只需使用 mergeMap
将可观察对象展平。
让我们把它们放在一起。请注意,如果您不准备一些 stub 数据和日志记录,则实际实现只有两行代码(使用 mergeMap + concatMap)。
const { from, interval } = rxjs;
const { mergeMap, concatMap, take, bufferCount, tap } = rxjs.operators;
// Stub for the long-running operation
function processTask(task) {
console.log("Processing task: ", task);
return new Promise(resolve => {
setTimeout(() => {
console.log("Finished task: ", task);
resolve(task);
}, 500 * Math.random() + 300);
});
}
// Turn processTask into a function returning an observable
const processTask$ = item => from(processTask(item));
// Some stubbed data stream
const tasks$ = interval(250).pipe(
take(9),
bufferCount(3),
);
tasks$.pipe(
tap(task => console.log("Received task: ", task)),
// Flatten the tasks array since we want to work in sequence anyway
mergeMap(tasks => tasks),
// Process each task, but do so consecutively
concatMap(task => processTask$(task)),
).subscribe(null, null, () => console.log("Done"));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>
关于javascript - Rxjs 订阅队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52246393/