multithreading - 实现 "move"线程语义

标签 multithreading rust messaging zero-copy

我想写一个这样调用的函数:

send("message","address");

其他线程在做什么

let k = recv("address");
println!("{}",k);

看到消息

特别是消息可能很大,因此我希望使用“移动”或“零复制”语义来发送消息。

在 C 中,解决方案类似于:

  1. 在堆上分配消息
  2. 有一个全局的、线程安全的 hashmap,将“地址”映射到某个内存位置
  3. 在发送时将指针写入内存位置,并使用信号量唤醒接收者
  4. 在接收时从内存位置读取指针,并等待信号量处理新消息

但是根据另一个 SO 问题,步骤 #2“sounds like a bad idea”。所以我希望看到一种更符合 Rust 习惯的方法来解决这个问题。

最佳答案

您会自动获得这些移动语义,并通过将大值放入 Box 中来实现轻量级移动。 (即在堆上分配它们)。使用 type ConcurrentHashMap<K, V> = Mutex<HashMap<K, V>>;作为线程安全 HashMap (有多种方法可以改进),可能有:

use std::collections::{HashMap, RingBuf};
use std::sync::Mutex;

type ConcurrentHashMap<K, V> = Mutex<HashMap<K, V>>;

lazy_static! {
    pub static ref MAP: ConcurrentHashMap<String, RingBuf<String>> = {
        Mutex::new(HashMap::new())
    }
}

fn send(message: String, address: String) {
    MAP.lock()
       // find the place this message goes
       .entry(address)
       .get()
       // create a new RingBuf if this address was empty
       .unwrap_or_else(|v| v.insert(RingBuf::new()))
       // add the message on the back
       .push_back(message)
}
fn recv(address: &str) -> Option<String> {
     MAP.lock()
        .get_mut(address)
        // pull the message off the front
        .and_then(|buf| buf.pop_front())
}

该代码使用了 lazy_static! 宏来实现全局 HashMap (最好使用包装 Arc<ConcurrentHashMap<...> 的本地对象,fwiw,因为全局状态会使对程序行为的推理变得困难)。它还使用 RingBuf作为队列,以便消息为给定的 address 存储起来.如果您一次只希望支持一条消息,则类型可以是 ConcurrentHashMap<String, String> , send可能变成MAP.lock().insert(address, message)recv只是 MAP.lock().remove(address) .

(注意。我还没有编译这个,所以类型可能不完全匹配。)

关于multithreading - 实现 "move"线程语义,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28179867/

相关文章:

c++ - "Correct"发送 UDP 数据报序列的方式?

java - 使用 Apache 发出 http 请求时如何捕获 InterruptedException?

c++ - 如何使用多核而不是多线程进行编程?

c# - 处理实时数据

generics - 通用特征和生命周期的问题

rust - 如何检测源地址是IPv4还是IPv6

Java多线程消息传递

c++ - 在析构函数中加入一个 boost::thread 实例

rust - Cargo run 忽略指定给 cargo build 的特性

java - 我应该使用什么 API 或框架来用 Java 实现这个消息系统?