javascript - Rxjs 订阅队列

标签 javascript firebase rxjs reactive-programming

我的 Angular 应用程序中有一个 firebase 订阅,它会触发多次。 如何实现任务作为队列处理,以便我可以同步运行每个任务一次?

this.tasks.subscribe(async tasks => {
   for (const x of tasks) 
      await dolongtask(x); // has to be sync
      await removetask(x);
   });

问题是 subribe 事件在 longtask 仍在处理时触发。

最佳答案

恕我直言,我会尝试利用 rxjs 的强大功能,因为无论如何我们已经在这里使用它,并且避免按照另一个答案的建议实现自定义队列概念(尽管您当然可以这样做)。

如果我们稍微简化给定的情况,我们只有一些可观察的,并且想要为每个发射执行一个长时间运行的过程——按顺序。 rxjs 允许通过 concatMap 运算符实现这一点,本质上是开箱即用的:

$data.pipe(concatMap(item => processItem(item))).subscribe();

这仅假设 processItem 返回一个可观察对象。由于您使用了 await,我假设您的函数当前返回 Promises。这些可以使用 from 简单地转换为 observables。

从 OP 中唯一需要注意的细节是,可观察对象实际上发出了一个项目数组,我们希望对每个发射的每个项目执行操作。为此,我们只需使用 mergeMap 将可观察对象展平。


让我们把它们放在一起。请注意,如果您不准备一些 stub 数据和日志记录,则实际实现只有行代码(使用 mergeMap + concatMap)。

const { from, interval } = rxjs;
const { mergeMap, concatMap, take, bufferCount, tap } = rxjs.operators;

// Stub for the long-running operation
function processTask(task) {
  console.log("Processing task: ", task);
  return new Promise(resolve => {
    setTimeout(() => {
      console.log("Finished task: ", task);
      resolve(task);
    }, 500 * Math.random() + 300);
  });
}

// Turn processTask into a function returning an observable
const processTask$ = item => from(processTask(item));

// Some stubbed data stream
const tasks$ = interval(250).pipe(
  take(9),
  bufferCount(3),
);

tasks$.pipe(
  tap(task => console.log("Received task: ", task)),
  // Flatten the tasks array since we want to work in sequence anyway
  mergeMap(tasks => tasks),
  // Process each task, but do so consecutively
  concatMap(task => processTask$(task)),
).subscribe(null, null, () => console.log("Done"));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>

关于javascript - Rxjs 订阅队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52246393/

相关文章:

javascript - 如何在 iOS 中为 HTML、CSS 和 Javascript 页面制作包装器

firebase - 如何使用 Firebase 处理自动服务器端计算?

firebase - Firebase 控制台中的实时数据库未加载数据

node.js - 使用代理时,运算符的 Rxjs 不发出值

rxjs - 使用 combineLatest 将 n+1 个流添加到 RXJS 流

javascript - 在 import 语句中使用解构

javascript - 我怎样才能以正确的方式编写这个 jquery on() ?

javascript - InvalidCharacterError recaptcha__en.js

html - 登录时隐藏可选的导航栏菜单 - Angular 8

javascript - Angular - 每 30 秒调用一次函数,但考虑运行该函数所需的时间