Redis - 使用 BRPOPLPUSH 时清理处理队列的更好方法(可靠)

标签 redis queue reliability

我们目前的设计

环境 Redis 2.8.17

我们已经实现了我们的可靠队列,使用类似于 redis 文档中描述的模式的模式,在 RPOPLPUSH

但是,考虑到其阻塞性质,我们正在使用 BRPOPLPUSH,并使用 LPUSH 来确保 FIFO 顺序。

生产者:多个线程(来自多个服务器)使用 LPUSH 推送项目。

消费者:多个线程(来自多个服务器)使用 BRPOPLPUSH 来处理项目。

BRPOPLPUSH q processing-q

如文档所述,redis 从队列“q”中弹出项目,同时将它们添加到“processing-q”中。

问题

由于我们应用程序的多线程(异步)特性,我们无法控制消费者何时完成他们的处理。

因此,如果我们使用 LREM(根据文档)从 processing-q 中删除已处理的元素>,这只会删除 processing-q 的顶部元素。由于它无法保证是否已删除由相应消费者处理的实际元素。

因此,如果我们什么都不做,processing-q 会继续增长(消耗内存),恕我直言,这是非常糟糕的。

有什么建议或想法吗?

最佳答案

您只需在对 LREM 的调用中包含要删除的作业。

LREM 采用以下形式:

LREM queue count "object"

它将从队列 中删除count 个等于"object" 的项目。因此,要删除您的消费者线程正在处理的特定工作,您需要做这样的事情。

LREM processing-q 1 "job_identifier"

有关更多信息,请参阅此处的文档:http://redis.io/commands/lrem

然后,为了处理崩溃的消费者和被放弃的作业,您可以使用 SETEX 创建具有到期时间的锁,并定期检查没有锁的作业。

所以整个过程是这样的:

制作人

  1. RPUSH q "job_identifier"

消费者

  1. SETEX lock:processing-q:job_identifier 60(先设置锁以避免竞争条件)
  2. BRPOPLPUSH q 处理队列
  3. 处理作业
  4. LREM 处理队列“job_identifier”

过期作业监视器

  1. jobs = LRANGE processing-queue 0 -1
  2. foreach job in jobs : lock = GET lock:processing-q:job_identifier
  3. 如果锁为空,则此作业超时,因此从 processing-q LREM processing-queue "job_identifier"
  4. 中移除
  5. 并使用 RPUSH q "job_identifier" 重试

@NotAUser 发布了一个开源 java 实现,此处:https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq

关于Redis - 使用 BRPOPLPUSH 时清理处理队列的更好方法(可靠),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27986649/

相关文章:

c - libmcrypt 不可靠吗?

node.js - 未处理的拒绝 RangeError : Using transaction with a client that is in monitor mode does not work due to faulty return values of Redis

symfony - 更新后刷新缓慢(Doctrine ORM + Symfony 2)

http - 使用异步队列连接 http 请求/响应模型

java:用于存储挂起的网络请求的线程安全数据结构(队列+映射)?

python - 迭代相同的 freezeset 是否保证始终以相同的顺序生成项目? (Python 3)

redis - 集合成员的 TTL

mongodb - redis vs mongodb - 从主服务器复制到运行内存数据库的从服务器

javascript - 在 Typescript 中实现 Bull Queue

installation - 运行 shell 脚本时,如何保护它不被覆盖或截断文件?