node.js - 在上一个作业完成之前不要处理下一个作业(BullJS/Redis)?

标签 node.js typescript redis bull

基本上,每个客户端 --- 具有 clientId与他们关联---可以推送消息和重要的是,在第一个消息完成处理之前,不会处理来自同一客户端的第二个消息 (即使客户端可以连续发送多个消息,并且它们是有序的,并且多个发送消息的客户端在理想情况下应该不会相互干扰)。而且,重要的是,一个工作不应该被处理两次。
我认为使用 Redis 可能能够解决这个问题,我开始使用 Bull 库进行一些快速原型(prototype)设计,但我显然做得不好,我希望有人知道如何继续。
这是我到目前为止所尝试的:

  • 使用 clientId 创建作业并将它们添加到一个进程的相同队列名称中作为工作名称。
  • 在 2 个单独的进程上等待大量随机时间的同时消耗作业。
  • 我尝试添加我正在使用的库提供的默认锁定( bull ),但它锁定在 jobId 上,它对于每个作业都是唯一的,而不是在 clientId 上。

  • 我想要发生的事情:
  • 其中一位消费者不能从同一个 clientId 接手工作直到前一个完成处理它。
  • 然而,他们应该能够从不同的clientId 中获取项目。 s 并行没有问题(异步)。 (我还没走到这一步,我现在只处理一个 clientId )

  • 我得到什么:
  • 两个消费者都从队列中消费尽可能多的项目,而无需等待 clientId 的前一个项目。要完成的。

  • Redis 甚至是适合这项工作的工具吗?
    示例代码
    // ./setup.ts
    import Queue from 'bull';
    import * as uuid from 'uuid';
    
    // Check that when a message is taken from a place, no other message is taken
    
    // TO do that test, have two processes that process messages and one that sets messages, and make the job take a long time
    
    // queue for each room https://stackoverflow.com/questions/54178462/how-does-redis-pubsub-subscribe-mechanism-works/54243792#54243792
    // https://groups.google.com/forum/#!topic/redis-db/R09u__3Jzfk
    
    // Make a job not be called stalled, waiting enough time https://github.com/OptimalBits/bull/issues/210#issuecomment-190818353
    
    export async function sleep(ms: number): Promise<void> {
      return new Promise((resolve) => {
        setTimeout(resolve, ms);
      });
    }
    export interface JobData {
      id: string;
      v: number;
    }
    export const queue = new Queue<JobData>('messages', 'redis://127.0.0.1:6379');
    
    queue.on('error', (err) => {
      console.error('Uncaught error on queue.', err);
      process.exit(1);
    });
    
    export function clientId(): string {
      return uuid.v4();
    }
    
    export function randomWait(minms: number, maxms: number): Promise<void> {
      const ms = Math.random() * (maxms - minms) + minms;
      return sleep(ms);
    }
    
    // Make a job not be called stalled, waiting enough time https://github.com/OptimalBits/bull/issues/210#issuecomment-190818353
    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    //@ts-ignore
    queue.LOCK_RENEW_TIME = 5 * 60 * 1000;
    
    
    // ./create.ts
    import { queue, randomWait } from './setup';
    
    const MIN_WAIT = 300;
    const MAX_WAIT = 1500;
    async function createJobs(n = 10): Promise<void> {
      await randomWait(MIN_WAIT, MAX_WAIT);
      // always same Id
      const clientId = Math.random() > 1 ? 'zero' : 'one';
      for (let index = 0; index < n; index++) {
        await randomWait(MIN_WAIT, MAX_WAIT);
        const job = { id: clientId, v: index };
        await queue.add(clientId, job).catch(console.error);
        console.log('Added job', job);
      }
    }
    
    export async function create(nIds = 10, nItems = 10): Promise<void> {
      const jobs = [];
      await randomWait(MIN_WAIT, MAX_WAIT);
      for (let index = 0; index < nIds; index++) {
        await randomWait(MIN_WAIT, MAX_WAIT);
        jobs.push(createJobs(nItems));
        await randomWait(MIN_WAIT, MAX_WAIT);
      }
      await randomWait(MIN_WAIT, MAX_WAIT);
      await Promise.all(jobs)
      process.exit();
    }
    
    (function mainCreate(): void {
      create().catch((err) => {
        console.error(err);
        process.exit(1);
      });
    })();
    
    
    // ./consume.ts
    import { queue, randomWait, clientId } from './setup';
    
    function startProcessor(minWait = 5000, maxWait = 10000): void {
      queue
        .process('*', 100, async (job) => {
          console.log('LOCKING: ', job.lockKey());
          await job.takeLock();
          const name = job.name;
          const processingId = clientId().split('-', 1)[0];
          try {
            console.log('START: ', processingId, '\tjobName:', name);
            await randomWait(minWait, maxWait);
            const data = job.data;
            console.log('PROCESSING: ', processingId, '\tjobName:', name, '\tdata:', data);
            await randomWait(minWait, maxWait);
            console.log('PROCESSED: ', processingId, '\tjobName:', name, '\tdata:', data);
            await randomWait(minWait, maxWait);
            console.log('FINISHED: ', processingId, '\tjobName:', name, '\tdata:', data);
          } catch (err) {
            console.error(err);
          } finally {
            await job.releaseLock();
          }
        })
        .catch(console.error); // Catches initialization
    }
    
    startProcessor();
    
    
    这是使用 3 个不同的进程运行的,您可能会这样称呼它们(尽管我使用不同的选项卡来更清楚地了解正在发生的事情)
    npx ts-node consume.ts & 
    npx ts-node consume.ts &
    npx ts-node create.ts &
    

    最佳答案

    我不熟悉 node.js。但是对于 Redis,我会尝试这个,
    假设您有client_1,client_2,它们都是事件的发布者。
    你有三台机器,consumer_1、consumer_2、consumer_3。

  • 在redis中建立一个任务列表,例如JOB_LIST。
  • 客户将 (LPUSH) 作业以特定形式放入此 JOB_LIST,例如“CLIENT_1:[jobcontent]”、“CLIENT_2:[jobcontent]”
  • 每个消费者以阻塞方式取出作业(Redis 的 RPOP 命令)并处理它们。
    比如consumer_1拿出一份工作,内容是CLIENT_1:[jobcontent]。它解析内容并识别它来自 CLIENT_1。然后它想检查其他消费者是否已经在处理 CLIENT_1,如果没有,它将锁定 key 以指示它正在处理 CLIENT_1。

  • 它继续使用 Redis SETNX 命令(如果该键不存在则设置)设置一个键“CLIENT_1_PROCESSING”,内容为“consumer_1”,并具有适当的超时时间。例如,任务通常需要一分钟才能完成,您将 key 的超时设置为五分钟,以防万一 consumer_1 崩溃并无限期地持有锁。
    如果 SETNX 返回 0,则表示获取 CLIENT_1 的锁失败(有人已经在处理 client_1 的作业)。然后它通过使用 Redis LPUSH 命令将作业(“CLIENT_1:[jobcontent]”的值)返回到 JOB_LIST 的左侧。然后它可能会等待一段时间(休眠几秒钟),然后从右侧 RPOP 另一个任务LIST 的一侧。如果这一次 SETNX 返回 1,则 consumer_1 获取锁。它继续处理作业,完成后,它删除“CLIENT_1_PROCESSING”的键,释放锁。然后它继续 RPOP 另一个工作,依此类推。
    需要考虑的一些事项:
  • JOB_LIST 不公平,例如,较早的作业可能稍后处理
  • 锁定部分有点简陋,但足够了。

  • - - - - - 更新 - - - - - - -
    我想出了另一种保持任务井井有条的方法。
    对于每个客户(生产者),建立一个列表。像“client_1_list”一样,将作业推送到列表的左侧。
    将所有客户端名称保存在列表“client_names_list”中,值为“client_1”、“client_2”等。
    对于每个消费者(处理器),迭代“client_names_list”,例如consumer_1得到一个“client_1”,检查client_1的key是否被锁定(有人已经在处理client_1的任务),如果没有,就弹出一个值(作业)来自 client_1_list 并锁定 client_1。如果 client_1 被锁定,(可能休眠一秒钟)并迭代到下一个客户端,例如“client_2”,并检查 key 等等。
    这样,每个客户(任务生产者)的任务都是按照他们的进入顺序来处理的。

    关于node.js - 在上一个作业完成之前不要处理下一个作业(BullJS/Redis)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62831610/

    相关文章:

    apache-spark - Spark-redis:数据帧写入时间太慢

    node.js - 如何动态加载多个React组件?

    mysql - 我的 node.js 服务器挂起 - 直到我发出 Ctrl+C

    node.js - Redis/我可以删除所有哈希表吗?

    Node.js 版本 : Recommended vs Current?

    javascript - 在 vue/typescript 中解析 CSV 文件

    typescript - 如何根据输入参数键入返回类型

    javascript - 如何通知另一个组件该方法执行

    go - 动态更新Redis排序集中的元素等级

    php - 在 Redis 中存储 JSON 编码的对象