multithreading - 避免闭包——将线程变量封装在一个结构体中

标签 multithreading rust

我正在基于 Rust websocket crate 编写一个名为 BoltServer 的简单 websocket 服务器(代码不完整,我刚刚开始)。我正在使用示例程序作为基础。然而示例程序不是模块化的(有很长的方法)。所以我试图将它们分解成结构和方法。我想为每个客户端生成两个线程。一个发送消息,另一个接收消息。所以在这里,我想把线程使用的所有变量都捕获到一个struct中,然后调用impl中的run方法。

extern crate websocket;
extern crate time;
extern crate rustc_serialize;

pub mod ws {
    use std::thread;
    use std::sync::{Arc, Mutex};
    use std::sync::mpsc;
    use std::net::ToSocketAddrs;
    use websocket;
    use websocket::{Server, Message, Sender, Receiver};
    use websocket::server::Connection;
    use websocket::stream::WebSocketStream;
    use std::str::from_utf8;

    struct BoltUser {
        user_id: u32,
        my_tx: mpsc::Sender<String>,
    }

    struct Broadcaster {
        my_rx: mpsc::Receiver<String>,
    }
    impl Broadcaster {
        fn new(receiver: mpsc::Receiver<String>) -> Broadcaster {
            Broadcaster { my_rx: receiver }
        }
        fn run(self) {
            while let Ok(msg) = self.my_rx.recv() {
                println!("Broadcaster got message: {}", msg);
            }
        }
    }

    struct SocketReader {}
    impl SocketReader {
        fn run(self) {}
    }

    struct SocketWriter {
        my_rx: mpsc::Receiver<String>,
        sender: Sender,
    }
    impl SocketWriter {
        fn run(self) {
            while let Ok(message) = self.my_rx.recv() {
            }
        }
    }

    pub struct BoltServer {
        address: String,
        connected_users: Arc<Mutex<Vec<BoltUser>>>,
    }
    impl BoltServer {
        pub fn new(address: &str) -> BoltServer {
            BoltServer {
                address: address.to_string(),
                connected_users: Arc::new(Mutex::new(vec![])),
            }
        }
        fn handshake(&mut self,
                     connection: Connection<WebSocketStream, WebSocketStream>)
                     -> (SocketWriter, SocketReader) {
            let request = connection.read_request().unwrap();
            // println!("thread-> Accepting request...");
            let response = request.accept();
            let (mut sender, mut receiver) = response.send().unwrap().split();
            let (user_tx, user_rx) = mpsc::channel::<String>();//Create a channel for writer
            let socket_writer = SocketWriter {
                my_rx: user_rx,
                sender: sender,
            };
            let socket_reader = SocketReader {};
            (socket_writer, socket_reader)
        }
        pub fn start(&mut self) {
            println!("Starting");
            let (broadcaster_tx, broadcaster_rx) = mpsc::channel::<String>();
            let broadcaster = Broadcaster::new(broadcaster_rx);
            let handle = thread::Builder::new()
                .name("Broadcaster".to_string())
                .spawn(move || broadcaster.run());

            let server = Server::bind(&*self.address).unwrap();

            let mut user_id: u32 = 0;

            // Block and process connection request from a new client
            for connection in server {
                user_id = user_id + 1;//Create a new user id
                let (socket_writer, socket_reader) = self.handshake(connection);
                thread::Builder::new()
                    .name("Socket writer".to_string())
                    .spawn(move || socket_writer.run());
                thread::Builder::new()
                    .name("Socket reader".to_string())
                    .spawn(move || socket_reader.run());
            }

            handle.unwrap().join();
            println!("Finished");
        }
    }
}

下面的代码给出了我想要实现的目标。

// Block and process connection request from a new client
for connection in server {
    user_id = user_id + 1;//Create a new user id
    let (socket_writer, socket_reader) = self.handshake(connection);
    thread::Builder::new().name("Socket writer".to_string()).spawn(move || {
        socket_writer.run()
    });
    thread::Builder::new().name("Socket reader".to_string()).spawn(move || {
        socket_reader.run()
    });
}

这里卡在了握手的方法上。我无法使用通过调用库中的 split 方法获得的发件人来初始化 SocketWriter 结构。我收到以下编译错误:

    error[E0038]: the trait `websocket::Sender` cannot be made into an object
  --> src/lib.rs:46:9
   |
46 |         sender:Sender,
   |         ^^^^^^^^^^^^^ the trait `websocket::Sender` cannot be made into an object
   |
   = note: method `send_dataframe` has generic type parameters
   = note: method `send_message` has generic type parameters

最佳答案

错误告诉您眼前的问题:

46 |         sender:Sender,
   |         ^^^^^^^^^^^^^ the trait `websocket::Sender` cannot be made into an object

首先,变量/字段不能有普通特征类型(但 &Trait 是可能的),而且 websocket::Sender 也可以特征不是 object safe ;它具有不能动态工作的通用方法(即 vtable 方法必须具有固定类型)。

相反,您必须有一个具体类型(您也可以将其设为通用结构)。

正确的类型是什么并不明显,所以我想让编译器告诉我。所以首先尝试最简单的方法:

    sender: (),

编译器返回一些信息:

|                       ^^^^^^ expected (), found struct `websocket::client::Sender`

好的,让我们插入:

    sender: websocket::client::Sender,

这给出了:

46 |         sender: websocket::client::Sender,
   |                 ^^^^^^^^^^^^^^^^^^^^^^^^^ expected 1 type arguments, found 0

好的,这是一个通用类型。下次尝试:

    sender: websocket::client::Sender<()>,

最后它给了我们真正的类型:

74 |                sender:sender,
   |                       ^^^^^^ expected (), found enum `websocket::WebSocketStream`

所以我们终于可以完成 SocketWriter :

struct SocketWriter {
    my_rx: mpsc::Receiver<String>,
    sender: websocket::client::Sender<websocket::WebSocketStream>,
}

connection 后出现以下编译错误你得到的是 Result<>所以你需要检查错误(如果我更改为 self.handshake(connection.unwrap()) 它会编译,但这显然不是最佳实践。

关于multithreading - 避免闭包——将线程变量封装在一个结构体中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40798945/

相关文章:

rust - 为什么借用仍然保留在 if let 的 else block 中?

rust - GitHub Actions 缓存 Rust 工件

rust - 在 rust 中使用 u32 整数类型

c# - 保持定时器类型的线程

multithreading - Spring 批处理 : problems (mix data) when converting to multithread

c++ - 纳秒级C++程序空转/节流

arrays - Rust:数组长度的宏

rust - 如果它们等于最接近于零,有没有办法告诉 rust 选择正数(例如 : -2 0 2) => 2

java - 如何防止共享同一个对象?

ios - 当我的 UITableViewCell 不再可见时,如何取消对象中的 NSOperation?