我们目前的设计
环境 Redis 2.8.17
我们已经实现了我们的可靠队列,使用类似于 redis 文档中描述的模式的模式,在 RPOPLPUSH 下
但是,考虑到其阻塞性质,我们正在使用 BRPOPLPUSH,并使用 LPUSH 来确保 FIFO 顺序。
生产者:多个线程(来自多个服务器)使用 LPUSH 推送项目。
消费者:多个线程(来自多个服务器)使用 BRPOPLPUSH 来处理项目。
BRPOPLPUSH q processing-q
如文档所述,redis 从队列“q”中弹出项目,同时将它们添加到“processing-q”中。
问题
由于我们应用程序的多线程(异步)特性,我们无法控制消费者何时完成他们的处理。
因此,如果我们使用 LREM(根据文档)从 processing-q 中删除已处理的元素>,这只会删除 processing-q 的顶部元素。由于它无法保证是否已删除由相应消费者处理的实际元素。
因此,如果我们什么都不做,processing-q 会继续增长(消耗内存),恕我直言,这是非常糟糕的。
有什么建议或想法吗?
最佳答案
您只需在对 LREM 的调用中包含要删除的作业。
LREM 采用以下形式:
LREM queue count "object"
它将从队列 中删除count 个等于"object" 的项目。因此,要删除您的消费者线程正在处理的特定工作,您需要做这样的事情。
LREM processing-q 1 "job_identifier"
有关更多信息,请参阅此处的文档:http://redis.io/commands/lrem
然后,为了处理崩溃的消费者和被放弃的作业,您可以使用 SETEX 创建具有到期时间的锁,并定期检查没有锁的作业。
所以整个过程是这样的:
制作人
RPUSH q "job_identifier"
消费者
SETEX lock:processing-q:job_identifier 60
(先设置锁以避免竞争条件)BRPOPLPUSH q 处理队列
- 处理作业
LREM 处理队列“job_identifier”
过期作业监视器
- jobs =
LRANGE processing-queue 0 -1
- foreach job in jobs : lock =
GET lock:processing-q:job_identifier
- 如果锁为空,则此作业超时,因此从 processing-q
LREM processing-queue "job_identifier"
中移除
- 并使用
RPUSH q "job_identifier"
重试
@NotAUser 发布了一个开源 java 实现,此处:https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq
关于Redis - 使用 BRPOPLPUSH 时清理处理队列的更好方法(可靠),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27986649/