用Rust實現一個多執行緒的web server

在本文之前,我們用Rust實現一個單執行緒的web server的例子,但是單執行緒的web server不夠高效,所以本篇文章就來實現一個多執行緒的例子。

單執行緒web server存在的問題

請求只能序列處理,也就是說當第一個連結處理完之前不會處理第二個連結。考慮如下例子:

use std::net::;

use std::io::;

use std::fs;

use std::;

fn handle_client(mut stream: TcpStream) {

let mut buffer = [0; 512];

stream。read(&mut buffer)。unwrap();

let get = b“GET / HTTP/1。1\r\n”;

let (status_line, filename) = if buffer。starts_with(get) {

(“HTTP/1。1 200 OK\r\n\r\n”, “main。html”)

} else {

(“HTTP/1。1 404 NOT FOUND\r\n\r\n”, “404。html”)

};

let contents = fs::read_to_string(filename)。unwrap();

let response = format!(“{}{}”, status_line, contents);

stream。write(response。as_bytes())。unwrap();

stream。flush()。unwrap();

let ten_millis = time::Duration::from_millis(10000);

thread::sleep(ten_millis);//睡眠一段時間,模擬處理時間很長

}

fn main() -> std::io::Result {

let listener = TcpListener::bind(“127。0。0。1:8080”)? ;

for stream in listener。incoming() {

handle_client(stream?);

}

Ok(())

}

在瀏覽器中開啟兩個視窗,分別輸入127。0。0。1:8080,會發現在第一個處理完之前,第二個不會響應。

使用多執行緒來解決問題

解決方式

修改main函式程式碼:

fn main() -> std::io::Result {

let listener = TcpListener::bind(“127。0。0。1:8080”)?;

let mut thread_vec: Vec> = Vec::new();

for stream in listener。incoming() {

// handle_client(stream?);

let stream = stream。unwrap();

let handle = thread::spawn(|| {

handle_client(stream);

});

thread_vec。push(handle);

}

for handle in thread_vec {

handle。join()。unwrap();

}

Ok(())

}

從瀏覽器開啟兩個標籤,進行測試,可以發現第一個沒有處理完之前,第二個請求已經開始處理。

存在問題

當存在海量請求時,系統也會跟著建立海量的執行緒,最終造成系統崩潰。

使用執行緒池來解決問題

執行緒池

用Rust實現一個多執行緒的web server

知識點

多執行緒、管道。

從主執行緒將任務傳送到管道,工作執行緒等待在管道的接收端,當收到任務時,進行處理。

執行緒池方式實現

1、初步設計

定義ThreadPool結構

use std::thread;

pub struct ThreadPool {

thread: Vec>,

}

定義ThreadPool的方法

impl ThreadPool {

pub fn new(size: usize) -> ThreadPool {

//——snip——

}

pub fn execute()

//pub fn execute(&self, f: F)

// where

// F: FnOnce() + Send + ‘static

{

//——snip——

}

}

下面我們考慮new函式,可能的實現是這樣

pub fn new(size: usize) -> ThreadPool {

assert!(size > 0);

let mut threads = Vec::with_capacity(size);

for _ in 0。。size {

//建立執行緒:

//問題來了,建立執行緒的時候需要傳入閉包,也就是具體做的動作,

//可是這個時候我們還沒有具體的任務,怎麼辦?

}

ThreadPool {

threads

}

}

execute函式

//設計execute的函式,可以參考thread::spawn

pub fn execute(&self, f: F)

where

F: FnOnce() + Send + ’static

{

}

初步設計的問題總結:

主要是在建立執行緒池的new函式中,需要傳入具體的任務,可是此時還沒有具體的任務,如何解決?

2、解決執行緒建立的問題

重新定義ThreadPool結構體

pub struct ThreadPool {

workers: Vec,

}

ThreadPool的new方法

pub fn new(size: usize) -> ThreadPool {

assert!(size > 0);

let mut workers = Vec::with_capacity(size);

for id in 0。。size {

workers。push(Worker::new(id));

}

ThreadPool {

workers

}

}

在worker中建立執行緒

struct Worker {

id: usize,

thread: thread::JoinHandle,

}

impl Worker {

fn new(id: usize) -> Worker {

let thread = thread::spawn(|| {});

Worker {

id,

thread,

}

}

}

3、傳送任務

進一步將ThreadPool結構設計為

use std::sync::mpsc;

pub struct ThreadPool {

workers: Vec,

sender: mpsc::Sender,

}

struct Job;

完善new方法

impl ThreadPool {

// ——snip——

pub fn new(size: usize) -> ThreadPool {

assert!(size > 0);

let (sender, receiver) = mpsc::channel();//add

let mut workers = Vec::with_capacity(size);

for id in 0。。size {

//workers。push(Worker::new(id));

workers。push(Worker::new(id, receiver));

}

ThreadPool {

workers,

sender,//add

}

}

// ——snip——

}

//——snip——

impl Worker {

fn new(id: usize, receiver: mpsc::Receiver) -> Worker {

let thread = thread::spawn(|| {

receiver;

});

Worker {

id,

thread,

}

}

}

此段程式碼錯誤,因為receiver要線上程間傳遞,但是是非執行緒安全的。因此應該使用Arc>。重新撰寫new方法如下:

use std::sync::Arc;

use std::sync::Mutex;

// ——snip——

impl ThreadPool {

// ——snip——

pub fn new(size: usize) -> ThreadPool {

assert!(size > 0);

let (sender, receiver) = mpsc::channel();

let receiver = Arc::new(Mutex::new(receiver));//add

let mut workers = Vec::with_capacity(size);

for id in 0。。size {

workers。push(Worker::new(id, Arc::clone(&receiver)));

}

ThreadPool {

workers,

sender,

}

}

// ——snip——

}

impl Worker {

fn new(id: usize, receiver: Arc>>) -> Worker {

let thread = thread::spawn(move || {

loop {

let job = receiver。lock()。unwrap()。recv()。unwrap();

println!(“Worker {} got a job; executing。”, id);

job();

}

});

Worker {

id,

thread,

}

}

}

實現execute方法

type Job = Box;//修改Job為trait物件的類別名稱

impl ThreadPool {

// ——snip——

pub fn execute(&self, f: F)

where

F: FnOnce() + Send + ‘static

{

let job = Box::new(f);

self。sender。send(job)。unwrap();

}

}

完整程式碼

src/main。rs

use std::fs;

use std::io::;

use std::net::;

use std::;

use mylib::ThreadPool;

fn handle_client(mut stream: TcpStream) {

let mut buffer = [0; 512];

stream。read(&mut buffer)。unwrap();

let get = b“GET / HTTP/1。1\r\n”;

let (status_line, filename) = if buffer。starts_with(get) {

(“HTTP/1。1 200 OK\r\n\r\n”, “main。html”)

} else {

(“HTTP/1。1 404 NOT FOUND\r\n\r\n”, “404。html”)

};

let contents = fs::read_to_string(filename)。unwrap();

let response = format!(“{}{}”, status_line, contents);

stream。write(response。as_bytes())。unwrap();

stream。flush()。unwrap();

let ten_millis = time::Duration::from_millis(10000);

thread::sleep(ten_millis);

}

fn main() -> std::io::Result {

let listener = TcpListener::bind(“127。0。0。1:8080”)?;

// let mut thread_vec: Vec> = Vec::new();

let pool = ThreadPool::new(4);

for stream in listener。incoming() {

// // handle_client(stream?);

let stream = stream。unwrap();

// let handle = thread::spawn(|| {

// handle_client(stream);

// });

// thread_vec。push(handle);

pool。execute(|| {

handle_client(stream);

});

}

// for handle in thread_vec {

// handle。join()。unwrap();

// }

Ok(())

}

src/mylib/lib。rs

use std::thread;

use std::sync::mpsc;

use std::sync::Arc;

use std::sync::Mutex;

struct Worker {

id: usize,

thread: thread::JoinHandle,

}

impl Worker {

// fn new(id: usize) -> Worker {

// let thread = thread::spawn(|| {});

// Worker {

// id,

// thread,

// }

// }

// fn new(id: usize, receiver: mpsc::Receiver) -> Worker {

// let thread = thread::spawn(|| {

// receiver;

// });

// Worker {

// id,

// thread,

// }

// }

fn new(id: usize, receiver: Arc>>) -> Worker {

let thread = thread::spawn(move || {

loop {

let job = receiver。lock()。unwrap()。recv()。unwrap();

println!(“Worker {} got a job; executing。”, id);

job();

}

});

Worker {

id,

thread,

}

}

}

pub struct ThreadPool {

workers: Vec,

sender: mpsc::Sender,

}

// struct Job;

type Job = Box;//修改Job為trait物件的類別名稱

impl ThreadPool {

pub fn new(size: usize) -> ThreadPool {

assert!(size > 0);

// let mut threads = Vec::with_capacity(size);

// for _ in 0。。size {

// //建立執行緒:

// //問題來了,建立執行緒的時候需要傳入閉包,也就是具體做的動作,

// //可是這個時候我們還沒有具體的任務,怎麼辦?

// }

// ThreadPool {

// threads

// }

let (sender, receiver) = mpsc::channel();

let receiver = Arc::new(Mutex::new(receiver));

let mut workers = Vec::with_capacity(size);

for id in 0。。size {

//workers。push(Worker::new(id));

//workers。push(Worker::new(id, receiver));

workers。push(Worker::new(id, Arc::clone(&receiver)));

}

ThreadPool {

workers,

sender,

}

}

pub fn execute(&self, f: F)

where

F: FnOnce() + Send + ’static

{

let job = Box::new(f);

self。sender。send(job)。unwrap();

}

}

在main的Cargo。toml新增如下依賴:

[dependencies]

mylib =

當前版本存在的問題

執行緒池中的執行緒怎麼結束?

想知道如何解決這個問題,請關注令狐一衝,下回為您分解。