我正在寻找消息调度的最佳算法。我所说的消息调度是指当我们有许多消费者以不同的速率在总线上发送消息的方法。
例子: 假设我们有数据 D1 到 Dn . D1 每 5ms 发送给多个消费者 C1,C2 每 19ms,C3 每 30ms,Cn 每 Rn ms . Dn每10ms发送给C1,C2每31ms,Cn每50ms
以最佳性能(CPU、内存、IO)安排此操作的最佳算法是什么?
问候
最佳答案
我可以想到很多选项,每个选项都有自己的成本和 yield 。这真的归结为您的需求是什么——真正定义“最佳”的是什么。我在下面对几种可能性进行了伪编码,希望能帮助您入门。
选项 1:每个时间单位(在您的示例中为毫秒)执行以下操作
func callEachMs
time = getCurrentTime()
for each datum
for each customer
if time % datum.customer.rate == 0
sendMsg()
这样做的好处是不需要一致存储的内存——您只需在每个时间单位检查您是否应该发送消息。这也可以处理未在 time == 0
发送的消息——只需存储消息最初发送的时间模速率,并将条件替换为 if time % datum .customer.rate == data.customer.firstMsgTimeMod
.
此方法的缺点是它完全依赖于始终以 1 毫秒的速率调用。如果 CPU 上的另一个进程导致延迟并且它错过了一个周期,您可能会完全错过发送消息(而不是稍晚发送)。
选项 2:维护一个元组列表列表,其中每个条目代表需要在该毫秒内完成的任务。使您的列表至少与最长速率除以时间单位一样长(如果您的最长速率为 50 毫秒并且您要按毫秒计算,则您的列表必须至少有 50 长)。当你启动你的程序时,放置第一个消息将被发送到队列中。然后每次发送消息时,在下次发送时更新该列表中的消息。
func buildList(&list)
for each datum
for each customer
if list.size < datum.customer.rate
list.resize(datum.customer.rate+1)
list[customer.rate].push_back(tuple(datum.name, customer.name))
func callEachMs(&list)
for each (datum.name, customer.name) in list[0]
sendMsg()
list[customer.rate].push_back((datum.name, customer.name))
list.pop_front()
list.push_back(empty list)
这样做的好处是避免了选项 1 所需的许多不必要的模数计算。然而,这伴随着增加内存使用的成本。如果您的各种消息的速率存在很大差异,则此实现也不会有效(尽管您可以修改它以更有效地处理具有更长速率的算法)。而且它仍然必须每毫秒调用一次。
最后,您必须非常仔细地考虑您使用的数据结构,因为这将对其效率产生巨大影响。因为你在每次迭代中从前面弹出并从后面插入,而且列表的大小是固定的,所以你可能想要实现一个 circular buffer。以避免不必要的值移动。对于元组列表,因为它们只会被迭代(不需要随机访问),并且会频繁添加,所以单链表可能是您的最佳解决方案。
.
显然,您可以通过更多方法来执行此操作,但希望这些想法可以帮助您入门。另外,请记住,您正在运行的系统的性质可能会对哪种方法效果更好,或者您是否想完全做其他事情产生很大影响。例如,这两种方法都要求它们可以以特定速率可靠地调用。我也没有描述并行化实现,如果您的应用程序支持它们,这可能是最佳选择。
关于algorithm - 安排以不同的速率向消费者发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50698807/