在本文之前,我們用Rust實現一個單執行緒的web server的例子,但是單執行緒的web server不夠高效,所以本篇文章就來實現一個多執行緒的例子。
單執行緒web server存在的問題
請求只能序列處理,也就是說當第一個連結處理完之前不會處理第二個連結。考慮如下例子:
use std::net::;
use std::io::;
use std::fs;
use std::;
fn handle_client(mut stream: TcpStream) {
let mut buffer = [0; 512];
stream。read(&mut buffer)。unwrap();
let get = b“GET / HTTP/1。1\r\n”;
let (status_line, filename) = if buffer。starts_with(get) {
(“HTTP/1。1 200 OK\r\n\r\n”, “main。html”)
} else {
(“HTTP/1。1 404 NOT FOUND\r\n\r\n”, “404。html”)
};
let contents = fs::read_to_string(filename)。unwrap();
let response = format!(“{}{}”, status_line, contents);
stream。write(response。as_bytes())。unwrap();
stream。flush()。unwrap();
let ten_millis = time::Duration::from_millis(10000);
thread::sleep(ten_millis);//睡眠一段時間,模擬處理時間很長
}
fn main() -> std::io::Result {
let listener = TcpListener::bind(“127。0。0。1:8080”)? ;
for stream in listener。incoming() {
handle_client(stream?);
}
Ok(())
}
在瀏覽器中開啟兩個視窗,分別輸入127。0。0。1:8080,會發現在第一個處理完之前,第二個不會響應。
使用多執行緒來解決問題
解決方式
修改main函式程式碼:
fn main() -> std::io::Result {
let listener = TcpListener::bind(“127。0。0。1:8080”)?;
let mut thread_vec: Vec> = Vec::new();
for stream in listener。incoming() {
// handle_client(stream?);
let stream = stream。unwrap();
let handle = thread::spawn(|| {
handle_client(stream);
});
thread_vec。push(handle);
}
for handle in thread_vec {
handle。join()。unwrap();
}
Ok(())
}
從瀏覽器開啟兩個標籤,進行測試,可以發現第一個沒有處理完之前,第二個請求已經開始處理。
存在問題
當存在海量請求時,系統也會跟著建立海量的執行緒,最終造成系統崩潰。
使用執行緒池來解決問題
執行緒池
知識點
多執行緒、管道。
從主執行緒將任務傳送到管道,工作執行緒等待在管道的接收端,當收到任務時,進行處理。
執行緒池方式實現
1、初步設計
定義ThreadPool結構
use std::thread;
pub struct ThreadPool {
thread: Vec>,
}
定義ThreadPool的方法
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
//——snip——
}
pub fn execute()
//pub fn execute(&self, f: F)
// where
// F: FnOnce() + Send + ‘static
{
//——snip——
}
}
下面我們考慮new函式,可能的實現是這樣
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0。。size {
//建立執行緒:
//問題來了,建立執行緒的時候需要傳入閉包,也就是具體做的動作,
//可是這個時候我們還沒有具體的任務,怎麼辦?
}
ThreadPool {
threads
}
}
execute函式
//設計execute的函式,可以參考thread::spawn
pub fn execute(&self, f: F)
where
F: FnOnce() + Send + ’static
{
}
初步設計的問題總結:
主要是在建立執行緒池的new函式中,需要傳入具體的任務,可是此時還沒有具體的任務,如何解決?
2、解決執行緒建立的問題
重新定義ThreadPool結構體
pub struct ThreadPool {
workers: Vec,
}
ThreadPool的new方法
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0。。size {
workers。push(Worker::new(id));
}
ThreadPool {
workers
}
}
在worker中建立執行緒
struct Worker {
id: usize,
thread: thread::JoinHandle,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker {
id,
thread,
}
}
}
3、傳送任務
進一步將ThreadPool結構設計為
use std::sync::mpsc;
pub struct ThreadPool {
workers: Vec,
sender: mpsc::Sender,
}
struct Job;
完善new方法
impl ThreadPool {
// ——snip——
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();//add
let mut workers = Vec::with_capacity(size);
for id in 0。。size {
//workers。push(Worker::new(id));
workers。push(Worker::new(id, receiver));
}
ThreadPool {
workers,
sender,//add
}
}
// ——snip——
}
//——snip——
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker {
id,
thread,
}
}
}
此段程式碼錯誤,因為receiver要線上程間傳遞,但是是非執行緒安全的。因此應該使用Arc>。重新撰寫new方法如下:
use std::sync::Arc;
use std::sync::Mutex;
// ——snip——
impl ThreadPool {
// ——snip——
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));//add
let mut workers = Vec::with_capacity(size);
for id in 0。。size {
workers。push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender,
}
}
// ——snip——
}
impl Worker {
fn new(id: usize, receiver: Arc>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver。lock()。unwrap()。recv()。unwrap();
println!(“Worker {} got a job; executing。”, id);
job();
}
});
Worker {
id,
thread,
}
}
}
實現execute方法
type Job = Box;//修改Job為trait物件的類別名稱
impl ThreadPool {
// ——snip——
pub fn execute(&self, f: F)
where
F: FnOnce() + Send + ‘static
{
let job = Box::new(f);
self。sender。send(job)。unwrap();
}
}
完整程式碼
src/main。rs
use std::fs;
use std::io::;
use std::net::;
use std::;
use mylib::ThreadPool;
fn handle_client(mut stream: TcpStream) {
let mut buffer = [0; 512];
stream。read(&mut buffer)。unwrap();
let get = b“GET / HTTP/1。1\r\n”;
let (status_line, filename) = if buffer。starts_with(get) {
(“HTTP/1。1 200 OK\r\n\r\n”, “main。html”)
} else {
(“HTTP/1。1 404 NOT FOUND\r\n\r\n”, “404。html”)
};
let contents = fs::read_to_string(filename)。unwrap();
let response = format!(“{}{}”, status_line, contents);
stream。write(response。as_bytes())。unwrap();
stream。flush()。unwrap();
let ten_millis = time::Duration::from_millis(10000);
thread::sleep(ten_millis);
}
fn main() -> std::io::Result {
let listener = TcpListener::bind(“127。0。0。1:8080”)?;
// let mut thread_vec: Vec> = Vec::new();
let pool = ThreadPool::new(4);
for stream in listener。incoming() {
// // handle_client(stream?);
let stream = stream。unwrap();
// let handle = thread::spawn(|| {
// handle_client(stream);
// });
// thread_vec。push(handle);
pool。execute(|| {
handle_client(stream);
});
}
// for handle in thread_vec {
// handle。join()。unwrap();
// }
Ok(())
}
src/mylib/lib。rs
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
struct Worker {
id: usize,
thread: thread::JoinHandle,
}
impl Worker {
// fn new(id: usize) -> Worker {
// let thread = thread::spawn(|| {});
// Worker {
// id,
// thread,
// }
// }
// fn new(id: usize, receiver: mpsc::Receiver) -> Worker {
// let thread = thread::spawn(|| {
// receiver;
// });
// Worker {
// id,
// thread,
// }
// }
fn new(id: usize, receiver: Arc>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver。lock()。unwrap()。recv()。unwrap();
println!(“Worker {} got a job; executing。”, id);
job();
}
});
Worker {
id,
thread,
}
}
}
pub struct ThreadPool {
workers: Vec,
sender: mpsc::Sender,
}
// struct Job;
type Job = Box;//修改Job為trait物件的類別名稱
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
// let mut threads = Vec::with_capacity(size);
// for _ in 0。。size {
// //建立執行緒:
// //問題來了,建立執行緒的時候需要傳入閉包,也就是具體做的動作,
// //可是這個時候我們還沒有具體的任務,怎麼辦?
// }
// ThreadPool {
// threads
// }
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0。。size {
//workers。push(Worker::new(id));
//workers。push(Worker::new(id, receiver));
workers。push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender,
}
}
pub fn execute(&self, f: F)
where
F: FnOnce() + Send + ’static
{
let job = Box::new(f);
self。sender。send(job)。unwrap();
}
}
在main的Cargo。toml新增如下依賴:
[dependencies]
mylib =
當前版本存在的問題
執行緒池中的執行緒怎麼結束?
想知道如何解決這個問題,請關注令狐一衝,下回為您分解。