multithreading - 如何使用 mpsc channel 在线程之间创建环形通信?

标签 multithreading rust channel

我想生成 n 个线程,它们能够与环形拓扑中的其他线程进行通信,例如线程 0 可以向线程 1 发送消息,线程 1 向线程 2 发送消息,以此类推,线程 n 向线程 0 发送消息。

这是我想用 n=3 实现的示例:

use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;

let (tx0, rx0): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let (tx1, rx1): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let (tx2, rx2): (Sender<i32>, Receiver<i32>) = mpsc::channel();

let child0 = thread::spawn(move || {
    tx0.send(0).unwrap();
    println!("thread 0 sent: 0");
    println!("thread 0 recv: {:?}", rx2.recv().unwrap());
});
let child1 = thread::spawn(move || {
    tx1.send(1).unwrap();
    println!("thread 1 sent: 1");
    println!("thread 1 recv: {:?}", rx0.recv().unwrap());
});
let child2 = thread::spawn(move || {
    tx2.send(2).unwrap();
    println!("thread 2 sent: 2");
    println!("thread 2 recv: {:?}", rx1.recv().unwrap());
});

child0.join();
child1.join();
child2.join();

在这里,我在循环中创建 channel ,将它们存储在一个向量中,重新排序发送者,将它们存储在一个新向量中,然后生成线程,每个线程都有自己的 Sender-Receiver(tx1/rx0、tx2/rx1 等)对。

const NTHREADS: usize = 8;

// create n channels
let channels: Vec<(Sender<i32>, Receiver<i32>)> =
    (0..NTHREADS).into_iter().map(|_| mpsc::channel()).collect();

// switch tupel entries for the senders to create ring topology
let mut channels_ring: Vec<(Sender<i32>, Receiver<i32>)> = (0..NTHREADS)
    .into_iter()
    .map(|i| {
        (
            channels[if i < channels.len() - 1 { i + 1 } else { 0 }].0,
            channels[i].1,
        )
    })
    .collect();

let mut children = Vec::new();
for i in 0..NTHREADS {
    let (tx, rx) = channels_ring.remove(i);

    let child = thread::spawn(move || {
        tx.send(i).unwrap();
        println!("thread {} sent: {}", i, i);
        println!("thread {} recv: {:?}", i, rx.recv().unwrap());
    });

    children.push(child);
}

for child in children {
    let _ = child.join();
}

这不起作用,因为无法复制 Sender 以创建新向量。 但是,如果我使用 refs (& Sender):

let mut channels_ring: Vec<(&Sender<i32>, Receiver<i32>)> = (0..NTHREADS)
    .into_iter()
    .map(|i| {
        (
            &channels[if i < channels.len() - 1 { i + 1 } else { 0 }].0,
            channels[i].1,
        )
    })
    .collect();

我无法生成线程,因为 std::sync::mpsc::Sender<i32>不能在线程之间安全地共享。

最佳答案

Senders 和 Receivers 不能共享,因此您需要将它们移动到各自的线程中。这意味着将它们从 Vec 中删除,或者在迭代时消耗 Vec - 向量不允许处于无效状态(有孔),即使作为中间步。使用 into_iter 遍历向量将通过使用它们来实现。

您可以使用一个小技巧来让发送者和接收者在一个循环中配对,即创建两个向量;发送者之一和接收者之一;然后旋转一个,以便每个向量中的相同索引将为您提供所需的对。

use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;

fn main() {
    const NTHREADS: usize = 8;

    // create n channels
    let (mut senders, receivers): (Vec<Sender<i32>>, Vec<Receiver<i32>>) =
        (0..NTHREADS).into_iter().map(|_| mpsc::channel()).unzip();

    // move the first sender to the back
    senders.rotate_left(1);

    let children: Vec<_> = senders
        .into_iter()
        .zip(receivers.into_iter())
        .enumerate()
        .map(|(i, (tx, rx))| {
            thread::spawn(move || {
                tx.send(i as i32).unwrap();
                println!("thread {} sent: {}", i, i);
                println!("thread {} recv: {:?}", i, rx.recv().unwrap());
            })
        })
        .collect();

    for child in children {
        let _ = child.join();
    }
}

关于multithreading - 如何使用 mpsc channel 在线程之间创建环形通信?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62249889/

相关文章:

c# - 锁语句有多贵?

c# - 动态创建线程

string - 如何将 Vec<String> 转换为 Vec<&str>?

java - 何时使用选择器以及何时使用阻塞 channel (性能)

c# - 线程化缓慢且不可预测?

python - 在一段时间后杀死线程的大多数 Pythonic 方法

xml - OwnedName 类型从何而来?

rust - Rust结构/枚举的可复制加密哈希

go - 如何在没有关联类型的情况下获取 channel "handle"

ruby-on-rails - 订阅 Action Cable channel 时如何设置参数