rust - 对堆积的 mpsc channel 消息进行去重

标签 rust

当使用 channel 时,假设在给定时刻,给定接收者的队列中有以下消息:

abc
blah
abc
something
something
blah
something

通常消息是这样处理的:

let (sender, receiver) = mpsc::channel::<Msg>();
for msg in receiver {
   // Process a message...
}

删除已堆积的重复邮件的最简单方法是什么?

使用上面的示例,我只想处理 3 条(而不是 7 条)消息:

abc
blah
something

注意:处理完一条消息 abc 后,以后可以再次接收(并处理)另一条消息 abc。目标是仅删除已堆积在队列中的重复消息。

最佳答案

解决方案1

这是一个示例解决方案:

loop {
    let deduped_msgs : HashSet<Msg> = HashSet::from_iter(receiver.try_iter());
    for msg in deduped_msgs {
        // Process a message...                
    }
    thread::sleep(time::Duration::from_millis(10));
}

上面使用的是非阻塞try_iter方法。我添加了一个简短的 sleep() 以避免 CPU 承受过大压力。

解决方案2

根据 Chayim 的建议,这里有一个类似的解决方案,它避免了每 X 毫秒定期检查的需要:

let mut deduped_msgs = HashSet::new();
loop {
    deduped_msgs.insert(receiver.recv().unwrap());
    deduped_msgs.extend(receiver.try_iter());               
    for msg in deduped_msgs.drain() {
        // Process a message... 
    }
}

处理顺序

如果消息的处理顺序很重要,HashSet (单独)可能不是最好的选择。

  • 要按照消息到达的顺序处理消息(仅考虑每条消息的第一次出现),可以使用 IndexSet例如。
  • 在某些情况下,消息的自然顺序可能更有意义,然后 BTreeSet可能是最简单的选择。
  • 要应用某种自定义优先级排序,可以 例如,利用向量和 sort it通过 sort_unstable_by 或者 sort_unstable_by_key (首先使用 HashSet 对它们进行重复数据删除可能更简单,但另一个 选项可能是 dedup 已经排序的向量)。

关于rust - 对堆积的 mpsc channel 消息进行去重,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76562554/

相关文章:

rust - 创建默认结构的最惯用方法

rust - "unresolved import -- maybe a missing extern"extern声明存在时

rust - 如何按插入顺序对 map 进行排序?

serialization - 如何从 Rust 中的二进制文件中读取 C 结构?

rust - 如何为实现特定特征的所有类型批量实现反序列化?

rust - 为实现 Trait 的类型实现 Borrow<Trait>

rust - serde skip 属性实际上是否跳过枚举变体?

multithreading - 无法迭代 Arc Mutex

pointers - 为什么 Deref::deref 的返回类型本身就是一个引用?

rust - Rust 中的 "*const ()"到底是什么?