我对 rust 还很陌生,我正在尝试通过做 www.rust-class.org 来学习它。 .在其中一项作业中,我必须实现简单的 Web 服务器。 github 上的大部分代码都是针对 v0.9 的,所以我不得不重写一些东西。无论如何:
网络服务器代码在下面,但我不希望您阅读下面的所有内容,所以我在问题发生时突出显示了部分。
use std::io::*;
use std::io::net::ip::SocketAddr;
use std::{str, os};
use std::sync::{Mutex, Arc, Semaphore};
use std::path::{Path, PosixPath};
use std::io::fs::PathExtensions;
use std::collections::{BinaryHeap, HashMap};
use std::io::timer::sleep;
use std::time::duration::Duration;
static CONTENT_TYPE_HTML: &'static str = "Content-Type: text/html; charset=UTF-8\r\n\r\n";
static HTTP_SUCCESS: &'static str = "HTTP/1.1 200 OK\r\n";
static HTTP_NOT_FOUND: &'static str = "HTTP/1.1 404 OK\r\n";
static START_COUNTER_STYLE: &'static str = "
<doctype !html><html>
<head>
<title>Hello, Rust!</title>
<style>
body { background-color: #884414; color: #FFEEAA}
h1 { font-size:2cm; text-align: center; color: black; text-shadow: 0 0 4mm red }
h2 { font-size:2cm; text-align: center; color: black; text-shadow: 0 0 4mm green }
</style>
</head>
<body>";
static END_COUNTER_STYLE: &'static str = "</body></html>\r\n";
static FILE_CHUNK: uint = 8192;
static MAX_CONCURRENCY: int = 4;
#[deriving(PartialEq, Eq)]
struct HTTPRequest {
peer_name: SocketAddr,
path: PosixPath,
file_size: uint,
priority: uint
}
impl PartialOrd for HTTPRequest {
fn partial_cmp(&self, other: &HTTPRequest) -> Option<Ordering> {
// Comparison is reversed to make PriorityQueue behave like a min-heap
(self.priority).partial_cmp(&other.priority)
}
}
impl Ord for HTTPRequest {
fn cmp(&self, other: &HTTPRequest) -> Ordering {
self.partial_cmp(other).unwrap()
}
}
pub struct WebServer {
port: uint,
ip_str: String,
request_queue_arc: Arc<Mutex<BinaryHeap<HTTPRequest>>>,
stream_map_arc: Arc<Mutex<HashMap<SocketAddr, Result<net::tcp::TcpStream, IoError>>>>,
notify_sender: Sender<()>,
notify_recv: Receiver<()>,
www_dir_path: Path,
concurrency_limit: Arc<Semaphore>,
}
impl WebServer {
pub fn new(ip_str: String, port: uint, www_dir_str: String) -> WebServer {
let (notify_sender, notify_recv) = channel();
let www_dir_path = Path::new(www_dir_str);
debug!("I'm serving server from directory: {}", www_dir_path.display());
WebServer {
ip_str: ip_str,
port: port,
www_dir_path: www_dir_path,
request_queue_arc: Arc::new(Mutex::new(BinaryHeap::new())),
stream_map_arc: Arc::new(Mutex::new(HashMap::new())),
notify_sender: notify_sender,
notify_recv: notify_recv,
concurrency_limit: Arc::new(Semaphore::new(MAX_CONCURRENCY))
}
}
pub fn run(&mut self) {
self.listen();
self.dequeue_static_file_request();
}
pub fn listen(&mut self) {
let addr = from_str::<SocketAddr>(format!("{}:{}", self.ip_str, self.port).as_slice()).expect("Address error.");
let stream_map_arc = self.stream_map_arc.clone();
let notify_sender = self.notify_sender.clone();
let request_queue_arc = self.request_queue_arc.clone();
let www_dir_path = self.www_dir_path.clone();
spawn(proc(){
let mut acceptor = net::tcp::TcpListener::bind(addr).listen();
println!("Listening on {}", addr);
let mut requests_counter: uint = 0;
for stream in acceptor.incoming() {
match stream.clone() {
Ok(mut res) => { res.set_timeout(Some(1000*10)); },
Err(why) => { panic!("Couldn't set timout for stream: {}", why.desc); }
}
requests_counter += 1;
let stream_map_arc = stream_map_arc.clone();
let notify_sender = notify_sender.clone();
let request_queue_arc = request_queue_arc.clone();
let www_dir_path = www_dir_path.clone();
spawn(proc() {
let mut stream = stream;
let mut buf = [0, ..500];
stream.read(&mut buf);
let request_str = str::from_utf8(buf.as_slice());
debug!("Request:\n{}", request_str);
let peer_name: SocketAddr = WebServer::peer_name(stream.clone());
match WebServer::get_request_path(www_dir_path.clone(), buf) {
Ok(request_path) => {
let extension = match request_path.extension_str() {
Some(ext) => ext,
None => ""
};
debug!("Requested path :\n{}", request_path.as_str());
debug!("Extension :\n{}",extension);
if request_path.as_str().expect("Request path err") == "www" {
debug!("===== Counter Page request =====");
WebServer::respond_with_counter_page(stream, requests_counter);
debug!("=====Terminated connection from [{}:{}].=====", peer_name.ip, peer_name.port);
} else if request_path.is_file() && (extension == "html" || extension == "bin") {
debug!("===== Static page request =====");
WebServer::enqueue_static_file_request(
stream,
request_path.clone(),
peer_name,
stream_map_arc,
request_queue_arc,
notify_sender
);
} else if request_path.is_file() && extension == "html" {
debug!("===== Dynamic page request =====");
// WebServer::respond_with_dynamic_page(stream, request_path.clone());
debug!("=====Terminated connection from [{}:{}].=====", peer_name.ip, peer_name.port);
} else {
debug!("===== Respond with error page =====");
WebServer::respond_with_error_page(stream);
debug!("=====Terminated connection from [{}:{}].=====", peer_name.ip, peer_name.port);
}
},
Err(_) => {
debug!("===== Respond with error page =====");
WebServer::respond_with_error_page(stream);
debug!("=====Terminated connection from [{}:{}].=====", peer_name.ip, peer_name.port);
}
}
})
}
});
}
fn respond_with_counter_page(stream: Result<net::tcp::TcpStream, IoError>, requests_counter: uint) {
WebServer::force_write(stream.clone(), format!("{}{}{}Requests:{}{}",
HTTP_SUCCESS,
CONTENT_TYPE_HTML,
START_COUNTER_STYLE,
requests_counter,
END_COUNTER_STYLE
).as_bytes())
}
fn respond_with_error_page(stream: Result<net::tcp::TcpStream, IoError>) {
WebServer::force_write(stream.clone(), HTTP_NOT_FOUND.as_bytes());
}
// fn respond_with_dynamic_page(stream: Result<net::tcp::TcpStream, IoError>, request_path: Path) {
// // WebServer::respond_with_static_page(stream, request_path);
// }
// TODO: Application-layer file caching.
fn respond_with_static_page(stream: net::tcp::TcpStream, request_path: Path) {
let mut stream = stream;
let mut file = match File::open(&request_path) {
Err(why) => {
debug!("File couln't be opened because: {} kind: {}", why.desc, why.kind);
return;
},
Ok(f) => { f }
};
stream.write(HTTP_SUCCESS.as_bytes());
stream.write(CONTENT_TYPE_HTML.as_bytes());
loop {
let mut buf = vec!();
match file.push_at_least(FILE_CHUNK, FILE_CHUNK, &mut buf) {
Err(why) => {
debug!("File reading problem: {}, {}", why.kind, why.desc)
if buf.len() > 0 {
stream.write(buf.as_slice());
}
return;
},
Ok(read_bytes_size) => {
match stream.write(buf.as_slice()) {
Err(why) => {
debug!("Stream broken: desc: {}, kind: {}", why.desc, why.kind);
return;
},
Ok(_) => {}
}
}
}
}
}
fn enqueue_static_file_request(
stream: Result<net::tcp::TcpStream, IoError>,
request_path: Path,
peer_name: SocketAddr,
stream_map_arc: Arc<Mutex<HashMap<SocketAddr, Result<net::tcp::TcpStream, IoError>>>>,
request_queue_arc: Arc<Mutex<BinaryHeap<HTTPRequest>>>,
notify_sender: Sender<()>,
) {
debug!("Enqueuing static file, waiting for streams lock... to stream: {}", request_path.display());
let mut local_map = stream_map_arc.lock();
local_map.insert(peer_name.clone(), stream);
debug!("Enqueuing static file, waiting for requests lock...");
let mut local_req = request_queue_arc.lock();
local_req.push(
HTTPRequest {
peer_name: peer_name.clone(),
path: request_path.clone(),
file_size: 1,
priority: 1
});
println!("enqueue_static_file_request: request_queue_arc length: {}", local_req.len());
println!("enqueue_static_file_request: stream_map_arc length: {}", local_map.len());
notify_sender.send(());
}
fn dequeue_static_file_request(&mut self) {
let stream_map_arc = self.stream_map_arc.clone();
let request_queue_arc = self.request_queue_arc.clone();
loop {
debug!("Waiting for requests!");
self.notify_recv.recv();
debug!("Dequeuing static file, waiting for requests lock...");
let mut local_req = request_queue_arc.lock();
match local_req.pop() {
Some(request) => {
println!("dequeue_static_file_request, request_queue_arc length: {}, {}",
local_req.len(),
&request.path.display()
);
debug!("Dequeuing static file, waiting for streams lock...");
let mut local_map = stream_map_arc.lock();
println!("dequeue_static_file_request, stream_map_arc length: {}", local_map.len());
match local_map.remove(&request.peer_name) {
None => { },
Some(stream) => match stream {
Ok(res) => {
self.concurrency_limit.acquire();
let child_concurrency_limit = self.concurrency_limit.clone();
let res = res.clone();
spawn(proc(){
WebServer::respond_with_static_page(res, request.path);
child_concurrency_limit.release();
debug!("=====Terminated connection from [{}:{}].=====",
&request.peer_name.ip, &request.peer_name.port
);
});
},
Err(_) => {
debug!("Stream had broken in the meantime.");
}
}
}
}
None => {}
}
}
}
fn peer_name(stream: Result<net::tcp::TcpStream, IoError>) -> net::ip::SocketAddr {
match stream {
Err(_) => panic!("Stream broken @ peer_name"),
Ok(res) => {
match res.clone().peer_name() {
Ok(addr) => addr,
Err(_) => panic!("Couldn't obtain peername from stream")
}
}
}
}
fn get_request_path(root: Path, buf: [u8, ..500]) -> Result<Path, &'static str> {
match str::from_utf8(buf.as_slice()) {
Some(request_str) => {
let request_headers: Vec<&str> = request_str.splitn(3, ' ').collect();
if request_headers.len() == 4 {
Ok(root.join(Path::new(format!("./{}", request_headers[1]))))
} else {
Err("Bad headers")
}
},
None => {
Err("Empty headers")
}
}
}
fn force_write(stream: Result<net::tcp::TcpStream, IoError>, content: &[u8]) {
let mut stream = stream;
match stream.as_mut() {
Err(_) => { debug!("Well. I wanted to safetly write to a... BROKEN stream."); },
Ok(res) => {
// ?? wtf
match res.write(content) {
Err(_) => { },
Ok(_) => { }
}
}
}
}
}
问题是,当我尝试流式传输文件时,线程(由于未释放流?)处于打开状态并且 RAM 使用量正在缓慢增加。我已经将它缩小到流媒体部分,因为例如如果我将 sleep(Duration::seconds(2))
并且不尝试流式传输文件,队列将随着时间的推移而增长回到零。当我尝试流式传输文件时,不会发生这种情况。
清洁状态:
运行 httpref 后:
并且 future 的请求永远等待流锁定。我曾尝试在某处寻找无限循环但没有成功 - 一切似乎都运行良好。
您有什么可能导致此类行为的建议吗?
最佳答案
假设文件的内容已加载到 body
中,则您缺少“Content-Length” header ,或者您可以通过其他方式尝试获取 len
stream.write(HTTP_SUCCESS.as_bytes());
stream.write(CONTENT_TYPE_HTML.as_bytes());
stream.write(format!("Content-length: {}", body.len())).unwrap();
关于multithreading - 简单服务器 - 如果尝试流式传输文件,线程将保持打开状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27136725/