redis - 如何创建分布式 'debounce' 任务来排空 Redis 列表?

标签 redis queue mutex distributed debouncing

我有以下用例:多个客户端推送到共享的 Redis 列表。一个单独的工作进程应该耗尽这个列表(处理和删除)。 Wait/multi-exec 已到位以确保一切顺利进行。

出于性能原因,我不想立即调用“排出”进程,而是在 x 毫秒后,从第一个客户端推送到(然后为空)列表的那一刻开始。

这类似于分布式下划线/lodash debounce function , 计时器在第一个项目进入时开始运行(即:'leading' 而不是 'trailing')

我正在寻找以容错方式可靠地执行此操作的最佳方法。

目前我倾向于以下方法:

  1. 使用Redis Set使用 NXpx 方法。这允许:
    • 仅将一个值(一个互斥体)设置到一个专用的键空间,如果它还不存在的话。这就是 nx 参数的用途
    • x 毫秒后使 key 过期。这就是 px 参数用于
  2. 如果可以设置该值,则此命令返回 1,这意味着之前不存在任何值。否则返回 01 表示当前客户端是自 Redis 列表耗尽后第一个运行该进程的客户端。因此,
  3. 此客户端将作业放在分布式队列中,该队列计划在 x 毫秒内运行。
  4. x 毫秒后,接收作业的工作人员开始清空列表。

这在纸面上可行,但感觉有点复杂。还有其他方法可以使它以分布式容错方式工作吗?

顺便说一句:Redis 和分布式队列已经到位,所以我认为使用它来解决这个问题不会造成额外的负担。

最佳答案

很抱歉,但正常的回应需要一堆文本/理论。因为你的问题很好,所以你已经写了一个很好的答案:)

首先我们应该定义术语。 underscore/lodash 中的'debounce'应该在David Corbacho’s article下学习解释:

Debounce: Think of it as "grouping multiple events in one". Imagine that you go home, enter in the elevator, doors are closing... and suddenly your neighbor appears in the hall and tries to jump on the elevator. Be polite! and open the doors for him: you are debouncing the elevator departure. Consider that the same situation can happen again with a third person, and so on... probably delaying the departure several minutes.

Throttle: Think of it as a valve, it regulates the flow of the executions. We can determine the maximum number of times a function can be called in certain time. So in the elevator analogy you are polite enough to let people in for 10 secs, but once that delay passes, you must go!

你问的是 debounce 因为第一个元素会被推到列表中:

So that, by analogy with the elevator. Elevator should go up after 10 minutes after the lift came first person. It does not matter how many people crammed into the elevator more.

在分布式容错系统的情况下,这应该被视为一组要求:

  1. 新列表的处理必须在插入第一个元素(即创建列表)后的 X 时间内开始。
  2. worker 崩溃不应该破坏任何东西。
  3. 无死锁。
  4. 无论 worker 数量是 1 还是 N,都必须满足第一个要求。

即你应该知道(以分布式方式)——一组工作人员必须等待,或者你可以开始列表处理。只要我们说出“分布式”和“容错”这两个词。这些概念总是引领他们的 friend :

  1. 原子性(例如通过阻塞)
  2. 预订

实践

在实践中,恐怕你的系统需要稍微复杂一点(也许你只是没有写过,而你已经有了)。

你的方法:

  1. 通过 SET NX PX 使用互斥锁进行悲观锁定。 NX 是一种保证,一次只有一个进程在做这项工作(原子性)。 PX 确保如果此进程发生问题,Redis 会释放锁(关于死锁的容错的一部分)。
  2. 所有工作人员都 try catch 一个互斥锁(每个列表键),因此只有一个工作人员会很高兴并在 X 次后处理列表。此过程可以更新互斥体的 TTL(如果需要更多时间)。如果进程崩溃 - 互斥量将在 TTL 后解锁并被其他工作人员获取。

我的建议

容错reliable queue processing在围绕 RPOPLPUSH 构建的 Redis 中:

  • RPOPLPUSH 项目从处理到特殊列表(每个列表每个 worker )。
  • 处理项目
  • 从特殊列表中删除项目

要求 因此,如果 worker 崩溃了,我们总是可以将损坏的消息从特殊列表返回到主列表。而 Redis 保证了 RPOPLPUSH/RPOP 的原子性。也就是只有一群有问题的 worker 等着。

然后是两个选项。首先 - 如果有很多客户和较少的 worker 使用 worker 一侧的锁定。因此,尝试在 worker 中锁定互斥量,如果成功 - 开始处理。

反之亦然。每次执行 LPUSH/RPUSH 时都使用 SET NX PX(如果您有很多工作人员和一些推送客户端,则使用“等待 N 次再从我那里弹出”解决方案)。所以推送是:

SET myListLock 1 PX 10000 NX 
LPUSH myList value

并且每个 worker 只检查 myListLock 是否存在,在设置处理互斥锁并开始排空之前,我们至少应该等待关键 TTL。

关于redis - 如何创建分布式 'debounce' 任务来排空 Redis 列表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26286241/

相关文章:

循环队列实现

c - 在 C 中使用互斥锁同步两个 pthread

c - 在pthread_cond_broadcast之后,哪个线程拥有关联的互斥锁?

.net - 退出上下文对于 WaitHandle.WaitOne 意味着什么?

redis - 使用 Redis 将唯一 ID 存储为包含其他键列表的键

redis - 具有主/从服务器的 StackExchange.Redis 客户端 - 如何处理失败的主服务器

java - 如何使用redis或memcached配置tomcat6或7共享 session ?

php - redis pub/sub 在 php 中是否现实?

api - Flutter 队列 API 请求稍后执行

MySql - 是否有查询队列