multithreading - 简单服务器 - 如果尝试流式传输文件,线程将保持打开状态

标签 multithreading rust

我对 rust 还很陌生,我正在尝试通过做 www.rust-class.org 来学习它。 .在其中一项作业中,我必须实现简单的 Web 服务器。 github 上的大部分代码都是针对 v0.9 的,所以我不得不重写一些东西。无论如何:

网络服务器代码在下面,但我不希望您阅读下面的所有内容,所以我在问题发生时突出显示了部分。

use std::io::*;
use std::io::net::ip::SocketAddr;
use std::{str, os};
use std::sync::{Mutex, Arc, Semaphore};
use std::path::{Path, PosixPath};
use std::io::fs::PathExtensions;
use std::collections::{BinaryHeap, HashMap};
use std::io::timer::sleep;
use std::time::duration::Duration;

static CONTENT_TYPE_HTML: &'static str = "Content-Type: text/html; charset=UTF-8\r\n\r\n";
static HTTP_SUCCESS: &'static str = "HTTP/1.1 200 OK\r\n";
static HTTP_NOT_FOUND: &'static str = "HTTP/1.1 404 OK\r\n";

static START_COUNTER_STYLE: &'static str = "
  <doctype !html><html>
    <head>
    <title>Hello, Rust!</title>
    <style>
      body { background-color: #884414; color: #FFEEAA}
      h1 { font-size:2cm; text-align: center; color: black; text-shadow: 0 0 4mm red }
      h2 { font-size:2cm; text-align: center; color: black; text-shadow: 0 0 4mm green }
    </style>
    </head>
    <body>";
static END_COUNTER_STYLE: &'static str = "</body></html>\r\n";

static FILE_CHUNK: uint = 8192;
static MAX_CONCURRENCY: int = 4;

#[deriving(PartialEq, Eq)]
struct HTTPRequest {
    peer_name: SocketAddr,
    path: PosixPath,
    file_size: uint,
    priority: uint
}

impl PartialOrd for HTTPRequest {
    fn partial_cmp(&self, other: &HTTPRequest) -> Option<Ordering> {
        // Comparison is reversed to make PriorityQueue behave like a min-heap
        (self.priority).partial_cmp(&other.priority)
    }
}

impl Ord for HTTPRequest {
    fn cmp(&self, other: &HTTPRequest) -> Ordering {
        self.partial_cmp(other).unwrap()
    }
}

pub struct WebServer {
    port: uint,
    ip_str: String,

    request_queue_arc: Arc<Mutex<BinaryHeap<HTTPRequest>>>,
    stream_map_arc: Arc<Mutex<HashMap<SocketAddr, Result<net::tcp::TcpStream, IoError>>>>,

    notify_sender: Sender<()>,
    notify_recv: Receiver<()>,

    www_dir_path: Path,

    concurrency_limit: Arc<Semaphore>,
}

impl WebServer {
    pub fn new(ip_str: String, port: uint, www_dir_str: String) -> WebServer {
        let (notify_sender, notify_recv) = channel();

        let www_dir_path = Path::new(www_dir_str);

        debug!("I'm serving server from directory: {}", www_dir_path.display());

        WebServer {
            ip_str: ip_str,
            port: port,
            www_dir_path: www_dir_path,

            request_queue_arc: Arc::new(Mutex::new(BinaryHeap::new())),
            stream_map_arc: Arc::new(Mutex::new(HashMap::new())),

            notify_sender: notify_sender,
            notify_recv: notify_recv,

            concurrency_limit: Arc::new(Semaphore::new(MAX_CONCURRENCY))
        }
    }

    pub fn run(&mut self) {
        self.listen();
        self.dequeue_static_file_request();
    }

    pub fn listen(&mut self) {
        let addr = from_str::<SocketAddr>(format!("{}:{}", self.ip_str, self.port).as_slice()).expect("Address error.");

        let stream_map_arc = self.stream_map_arc.clone();
        let notify_sender = self.notify_sender.clone();
        let request_queue_arc = self.request_queue_arc.clone();
        let www_dir_path = self.www_dir_path.clone();

        spawn(proc(){
            let mut acceptor = net::tcp::TcpListener::bind(addr).listen();
            println!("Listening on {}", addr);
            let mut requests_counter: uint = 0;

            for stream in acceptor.incoming() {
                match stream.clone() {
                    Ok(mut res) => { res.set_timeout(Some(1000*10)); },
                    Err(why) => { panic!("Couldn't set timout for stream: {}", why.desc); }
                }
                requests_counter += 1;

                let stream_map_arc = stream_map_arc.clone();
                let notify_sender = notify_sender.clone();
                let request_queue_arc = request_queue_arc.clone();
                let www_dir_path = www_dir_path.clone();

                spawn(proc() {
                    let mut stream = stream;
                    let mut buf = [0, ..500];
                    stream.read(&mut buf);
                    let request_str = str::from_utf8(buf.as_slice());

                    debug!("Request:\n{}", request_str);

                    let peer_name: SocketAddr = WebServer::peer_name(stream.clone());

                    match WebServer::get_request_path(www_dir_path.clone(), buf) {
                        Ok(request_path) => {
                            let extension = match request_path.extension_str() {
                                Some(ext) => ext,
                                None => ""
                            };

                            debug!("Requested path :\n{}", request_path.as_str());
                            debug!("Extension :\n{}",extension);

                            if request_path.as_str().expect("Request path err") == "www" {
                                debug!("===== Counter Page request =====");
                                WebServer::respond_with_counter_page(stream, requests_counter);
                                debug!("=====Terminated connection from [{}:{}].=====", peer_name.ip, peer_name.port);
                            } else if request_path.is_file() && (extension == "html" || extension == "bin") {
                                debug!("===== Static page request =====");
                                WebServer::enqueue_static_file_request(
                                    stream,
                                    request_path.clone(),
                                    peer_name,
                                    stream_map_arc,
                                    request_queue_arc,
                                    notify_sender
                                );
                            } else if request_path.is_file() && extension == "html" {
                                debug!("===== Dynamic page request =====");
                                // WebServer::respond_with_dynamic_page(stream, request_path.clone());
                                debug!("=====Terminated connection from [{}:{}].=====", peer_name.ip, peer_name.port);
                            } else {
                                debug!("===== Respond with error page =====");
                                WebServer::respond_with_error_page(stream);
                                debug!("=====Terminated connection from [{}:{}].=====", peer_name.ip, peer_name.port);
                            }
                        },
                        Err(_) => {
                            debug!("===== Respond with error page =====");
                            WebServer::respond_with_error_page(stream);
                            debug!("=====Terminated connection from [{}:{}].=====", peer_name.ip, peer_name.port);
                        }
                    }
                })
            }
        });
    }

    fn respond_with_counter_page(stream: Result<net::tcp::TcpStream, IoError>, requests_counter: uint) {
        WebServer::force_write(stream.clone(), format!("{}{}{}Requests:{}{}",
            HTTP_SUCCESS,
            CONTENT_TYPE_HTML,
            START_COUNTER_STYLE,
            requests_counter,
            END_COUNTER_STYLE
        ).as_bytes())
    }

    fn respond_with_error_page(stream: Result<net::tcp::TcpStream, IoError>) {
        WebServer::force_write(stream.clone(), HTTP_NOT_FOUND.as_bytes());
    }

    // fn respond_with_dynamic_page(stream: Result<net::tcp::TcpStream, IoError>, request_path: Path) {
    //     // WebServer::respond_with_static_page(stream, request_path);
    // }

    // TODO: Application-layer file caching.
    fn respond_with_static_page(stream: net::tcp::TcpStream, request_path: Path) {
        let mut stream = stream;
        let mut file = match File::open(&request_path) {
            Err(why) => {
                debug!("File couln't be opened because: {} kind: {}", why.desc, why.kind);
                return;
            },
            Ok(f) => { f }
        };

        stream.write(HTTP_SUCCESS.as_bytes());
        stream.write(CONTENT_TYPE_HTML.as_bytes());

        loop {
            let mut buf = vec!();
            match file.push_at_least(FILE_CHUNK, FILE_CHUNK, &mut buf) {
                Err(why) => {
                    debug!("File reading problem: {}, {}", why.kind, why.desc)
                    if buf.len() > 0 {
                        stream.write(buf.as_slice());
                    }
                    return;
                },
                Ok(read_bytes_size) => {
                    match stream.write(buf.as_slice()) {
                        Err(why) => {
                            debug!("Stream broken: desc: {}, kind: {}", why.desc, why.kind);
                            return;
                        },
                        Ok(_) => {}
                    }
                }
            }
        }
    }

    fn enqueue_static_file_request(
        stream: Result<net::tcp::TcpStream, IoError>,
        request_path: Path,
        peer_name: SocketAddr,

        stream_map_arc: Arc<Mutex<HashMap<SocketAddr, Result<net::tcp::TcpStream, IoError>>>>,
        request_queue_arc: Arc<Mutex<BinaryHeap<HTTPRequest>>>,
        notify_sender: Sender<()>,
    ) {
        debug!("Enqueuing static file, waiting for streams lock... to stream: {}", request_path.display());
        let mut local_map = stream_map_arc.lock();
        local_map.insert(peer_name.clone(), stream);

        debug!("Enqueuing static file, waiting for requests lock...");
        let mut local_req = request_queue_arc.lock();


        local_req.push(
            HTTPRequest {
                peer_name: peer_name.clone(),
                path: request_path.clone(),
                file_size: 1,
                priority: 1
            });

        println!("enqueue_static_file_request: request_queue_arc length: {}", local_req.len());
        println!("enqueue_static_file_request: stream_map_arc length: {}", local_map.len());

        notify_sender.send(());
    }

    fn dequeue_static_file_request(&mut self) {
        let stream_map_arc = self.stream_map_arc.clone();
        let request_queue_arc = self.request_queue_arc.clone();

        loop {
            debug!("Waiting for requests!");
            self.notify_recv.recv();
            debug!("Dequeuing static file, waiting for requests lock...");

            let mut local_req = request_queue_arc.lock();

            match local_req.pop() {
                Some(request) => {
                    println!("dequeue_static_file_request, request_queue_arc length: {}, {}",
                        local_req.len(),
                        &request.path.display()
                    );
                    debug!("Dequeuing static file, waiting for streams lock...");
                    let mut local_map = stream_map_arc.lock();
                    println!("dequeue_static_file_request, stream_map_arc length: {}", local_map.len());

                    match local_map.remove(&request.peer_name) {
                        None => { },
                        Some(stream) => match stream {
                                Ok(res) => {
                                    self.concurrency_limit.acquire();
                                    let child_concurrency_limit = self.concurrency_limit.clone();
                                    let res = res.clone();
                                    spawn(proc(){
                                        WebServer::respond_with_static_page(res, request.path);
                                        child_concurrency_limit.release();
                                        debug!("=====Terminated connection from [{}:{}].=====",
                                            &request.peer_name.ip, &request.peer_name.port
                                        );
                                    });
                                },

                                Err(_) => {
                                    debug!("Stream had broken in the meantime.");
                                }
                            }
                    }
                }
                None => {}
            }
        }
    }

    fn peer_name(stream: Result<net::tcp::TcpStream, IoError>) -> net::ip::SocketAddr {
        match stream {
            Err(_) => panic!("Stream broken @ peer_name"),
            Ok(res) => {
                match res.clone().peer_name() {
                    Ok(addr) => addr,
                    Err(_) => panic!("Couldn't obtain peername from stream")
                }
            }
        }
    }

    fn get_request_path(root: Path, buf: [u8, ..500]) -> Result<Path, &'static str> {
        match str::from_utf8(buf.as_slice()) {
            Some(request_str) => {
                let request_headers: Vec<&str> = request_str.splitn(3, ' ').collect();
                if request_headers.len() == 4 {
                    Ok(root.join(Path::new(format!("./{}", request_headers[1]))))
                } else {
                    Err("Bad headers")
                }
            },
            None => {
                Err("Empty headers")
            }
        }
    }

    fn force_write(stream: Result<net::tcp::TcpStream, IoError>, content: &[u8]) {
        let mut stream = stream;

        match stream.as_mut() {
            Err(_) => { debug!("Well. I wanted to safetly write to a... BROKEN stream."); },
            Ok(res) => {
                // ?? wtf
                match res.write(content) {
                    Err(_) => { },
                    Ok(_) => { }
                }
            }
        }
    }
}

问题是,当我尝试流式传输文件时,线程(由于未释放流?)处于打开状态并且 RAM 使用量正在缓慢增加。我已经将它缩小到流媒体部分,因为例如如果我将 sleep(Duration::seconds(2)) 并且不尝试流式传输文件,队列将随着时间的推移而增长回到零。当我尝试流式传输文件时,不会发生这种情况。

清洁状态:

Clean state

运行 httpref 后:

After runnig httpref

并且 future 的请求永远等待流锁定。我曾尝试在某处寻找无限循环但没有成功 - 一切似乎都运行良好。

您有什么可能导致此类行为的建议吗?

最佳答案

假设文件的内容已加载到 body 中,则您缺少“Content-Length” header ,或者您可以通过其他方式尝试获取 len

    stream.write(HTTP_SUCCESS.as_bytes());
    stream.write(CONTENT_TYPE_HTML.as_bytes());
    stream.write(format!("Content-length: {}", body.len())).unwrap();

关于multithreading - 简单服务器 - 如果尝试流式传输文件,线程将保持打开状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27136725/

相关文章:

java - 如何制作可以启动和停止的线程之类的东西?

java - 在线程之间传递工作项 (Java)

java - 如何等待多线程 shell 脚本执行完成在我的 Web 服务中调用?

rust - LLVM 优化器不尊重使用 Rust 内联 asm 设置非默认舍入模式?

具有由实现定义的常量字段的 Rust 特征

rust - 为什么这个 rppal I2c 结构没有任何方法?

websocket - tokio_tungstenite::WebSocketStream 的 split() 方法在哪里实现?

rust - 使用具有泛型类型的运算符时出错

c++ - 访问冲突读取位置0xfeeeefef2 多线程编程c++ windows

java - 数组写入竞争条件