multithreading - 将 TcpStream + SslStream 分离成读写组件

标签 multithreading ssl tcp concurrency rust

我正在尝试制作使用由 openssl::ssl::SslStream(来自 crates.io)包装的 TcpStream 与服务器通信的客户端程序。它应该等待读取,并在没有延迟接收到从服务器发送的数据时处理数据。同时,无论是否读取,它都应该能够向服务器发送消息。

我尝试了一些方法比如

  1. 将单个流传递给读取和写入线程。 readwrite 方法都需要可变引用,因此我无法将单个流传递给两个线程。
  2. 我关注了In Rust how do I handle parallel read writes on a TcpStream , 但 TcpStream 似乎没有 clone 方法,SslStream 也没有。
  3. 我尝试使用 as_raw_fdfrom_raw_fd 制作 TcpStream 的副本:
fn irc_read(mut stream: SslStream<TcpStream>) {
    loop {
        let mut buf = vec![0; 2048];
        let resp = stream.ssl_read(&mut buf);
        match resp {
            // Process Message
        }
    }
}

fn irc_write(mut stream: SslStream<TcpStream>) {
    thread::sleep(Duration::new(3, 0));
    let msg = "QUIT\n";
    let res = stream.ssl_write(msg.as_bytes());
    let _ = stream.flush();
    match res {
        // Process
    }
}

fn main() {
    let ctx = SslContext::new(SslMethod::Sslv23).unwrap();
    let read_ssl = Ssl::new(&ctx).unwrap();
    let write_ssl = Ssl::new(&ctx).unwrap();

    let raw_stream = TcpStream::connect((SERVER, PORT)).unwrap();
    let mut fd_stream: TcpStream;
    unsafe {
        fd_stream = TcpStream::from_raw_fd(raw_stream.as_raw_fd());
    }
    let mut read_stream = SslStream::connect(read_ssl, raw_stream).unwrap();
    let mut write_stream = SslStream::connect(write_ssl, fd_stream).unwrap();

    let read_thread = thread::spawn(move || {
        irc_read(read_stream);
    });

    let write_thread = thread::spawn(move || {
        irc_write(write_stream);
    });

    let _ = read_thread.join();
    let _ = write_thread.join();
}

这段代码可以编译,但在第二个 SslStream::connect

上会出现错误
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Failure(Ssl(ErrorStack([Error { library: "SSL routines", function: "SSL23_GET_SERVER_HELLO", reason: "unknown protocol" }])))', ../src/libcore/result.rs:788
stack backtrace:
   1:     0x556d719c6069 - std::sys::backtrace::tracing::imp::write::h00e948915d1e4c72
   2:     0x556d719c9d3c - std::panicking::default_hook::_{{closure}}::h7b8a142818383fb8
   3:     0x556d719c8f89 - std::panicking::default_hook::h41cf296f654245d7
   4:     0x556d719c9678 - std::panicking::rust_panic_with_hook::h4cbd7ca63ce1aee9
   5:     0x556d719c94d2 - std::panicking::begin_panic::h93672d0313d5e8e9
   6:     0x556d719c9440 - std::panicking::begin_panic_fmt::hd0daa02942245d81
   7:     0x556d719c93c1 - rust_begin_unwind
   8:     0x556d719ffcbf - core::panicking::panic_fmt::hbfc935564d134c1b
   9:     0x556d71899f02 - core::result::unwrap_failed::h66f79b2edc69ddfd
                        at /buildslave/rust-buildbot/slave/stable-dist-rustc-linux/build/obj/../src/libcore/result.rs:29
  10:     0x556d718952cb - _<core..result..Result<T, E>>::unwrap::h49a140af593bc4fa
                        at /buildslave/rust-buildbot/slave/stable-dist-rustc-linux/build/obj/../src/libcore/result.rs:726
  11:     0x556d718a5e3d - dbrust::main::h24a50e631826915e
                        at /home/lastone817/dbrust/src/main.rs:87
  12:     0x556d719d1826 - __rust_maybe_catch_panic
  13:     0x556d719c8702 - std::rt::lang_start::h53bf99b0829cc03c
  14:     0x556d718a6b83 - main
  15:     0x7f40a0b5082f - __libc_start_main
  16:     0x556d7188d038 - _start
  17:                0x0 - <unknown>
error: Process didn't exit successfully: `target/debug/dbrust` (exit code: 101)

到目前为止,我发现的最佳解决方案是使用非阻塞。我在流上使用了 Mutex 并将其传递给两个线程。然后读取线程获取锁并调用read。如果没有消息,它会释放锁,以便写入线程可以使用该流。采用这种方式,读线程做忙等待,导致CPU占用100%。我认为这不是最佳解决方案。

是否有一种安全的方法来分离流的读写方面?

最佳答案

我使用 Rust 的 std::cell::UnsafeCell 将 SSL 流拆分为读取和写入部分.

extern crate native_tls;

use native_tls::TlsConnector;
use std::cell::UnsafeCell;
use std::error::Error;
use std::io::Read;
use std::io::Write;
use std::marker::Sync;
use std::net::TcpStream;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

struct UnsafeMutator<T> {
    value: UnsafeCell<T>,
}

impl<T> UnsafeMutator<T> {
    fn new(value: T) -> UnsafeMutator<T> {
        return UnsafeMutator {
            value: UnsafeCell::new(value),
        };
    }

    fn mut_value(&self) -> &mut T {
        return unsafe { &mut *self.value.get() };
    }
}

unsafe impl<T> Sync for UnsafeMutator<T> {}

struct ReadWrapper<R>
where
    R: Read,
{
    inner: Arc<UnsafeMutator<R>>,
}

impl<R: Read> Read for ReadWrapper<R> {
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
        return self.inner.mut_value().read(buf);
    }
}
struct WriteWrapper<W>
where
    W: Write,
{
    inner: Arc<UnsafeMutator<W>>,
}

impl<W: Write> Write for WriteWrapper<W> {
    fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
        return self.inner.mut_value().write(buf);
    }

    fn flush(&mut self) -> Result<(), std::io::Error> {
        return self.inner.mut_value().flush();
    }
}

pub struct Socket {
    pub output_stream: Arc<Mutex<Write + Send>>,
    pub input_stream: Arc<Mutex<Read + Send>>,
}

impl Socket {
    pub fn bind(host: &str, port: u16, secure: bool) -> Result<Socket, Box<Error>> {
        let tcp_stream = match TcpStream::connect((host, port)) {
            Ok(x) => x,
            Err(e) => return Err(Box::new(e)),
        };
        if secure {
            let tls_connector = TlsConnector::builder().build().unwrap();
            let tls_stream = match tls_connector.connect(host, tcp_stream) {
                Ok(x) => x,
                Err(e) => return Err(Box::new(e)),
            };
            let mutator = Arc::new(UnsafeMutator::new(tls_stream));
            let input_stream = Arc::new(Mutex::new(ReadWrapper {
                inner: mutator.clone(),
            }));
            let output_stream = Arc::new(Mutex::new(WriteWrapper { inner: mutator }));

            let socket = Socket {
                output_stream,
                input_stream,
            };
            return Ok(socket);
        } else {
            let mutator = Arc::new(UnsafeMutator::new(tcp_stream));
            let input_stream = Arc::new(Mutex::new(ReadWrapper {
                inner: mutator.clone(),
            }));
            let output_stream = Arc::new(Mutex::new(WriteWrapper { inner: mutator }));

            let socket = Socket {
                output_stream,
                input_stream,
            };
            return Ok(socket);
        }
    }
}

fn main() {
    let socket = Arc::new(Socket::bind("google.com", 443, true).unwrap());

    let socket_clone = Arc::clone(&socket);

    let reader_thread = thread::spawn(move || {
        let mut res = vec![];
        let mut input_stream = socket_clone.input_stream.lock().unwrap();
        input_stream.read_to_end(&mut res).unwrap();
        println!("{}", String::from_utf8_lossy(&res));
    });

    let writer_thread = thread::spawn(move || {
        let mut output_stream = socket.output_stream.lock().unwrap();
        output_stream.write_all(b"GET / HTTP/1.0\r\n\r\n").unwrap();
    });

    writer_thread.join().unwrap();
    reader_thread.join().unwrap();
}

关于multithreading - 将 TcpStream + SslStream 分离成读写组件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40233482/

相关文章:

vb.net - 如何在 VB.net 中指定 volatile 的等效项?

c++ - 具有pthread和锁且无提升的单读者多作者

c# - 何时应处置 ManualResetEvent?

ssl - 没有 Openssl 的 HTTP 公钥固定 (HPKP)

c++ - 当服务器速度慢时,写入客户端返回 EWOULDBLOCK

python - 在多个 pcaps 上使用 pynids

java - 单个程序如何与多个客户端共享? (意味着多个客户端线程共享相同的对象吗?)

amazon-web-services - 通配符子域@AWS route53 的 SSL 证书错误别名为 ELB

c# - 当调用通过 SSL 终结器时,DiscoveryClient.GetAsync 失败 “Issuer name does not match authority”

c - TCP 套接字,服务器无法响应客户端,接受 : Interrupted system call