Rust中多線程?Web?服務(wù)器的項(xiàng)目實(shí)戰(zhàn)
前情提要:http://chabaoo.cn/program/34427748j.htm
單線程 Web 服務(wù)器將依次處理每個(gè)請(qǐng)求,這意味著在第一個(gè)連接完成處理之前,它不會(huì)處理第二個(gè)連接。如果服務(wù)器接收到越來越多的請(qǐng)求,那么串行執(zhí)行將越來越不理想。如果服務(wù)器接收到一個(gè)需要很長(zhǎng)時(shí)間來處理的請(qǐng)求,則后續(xù)請(qǐng)求將不得不等待,直到長(zhǎng)請(qǐng)求完成,即使新請(qǐng)求可以快速處理。我們需要解決這個(gè)問題,但首先我們要看看實(shí)際的問題。
模擬慢速請(qǐng)求
我們將了解處理緩慢的請(qǐng)求如何影響對(duì)單線程 Web 服務(wù)器實(shí)現(xiàn)的其他請(qǐng)求。
我們使用模擬的緩慢響應(yīng)實(shí)現(xiàn)了對(duì) /sleep 的請(qǐng)求處理,該響應(yīng)將導(dǎo)致服務(wù)器在響應(yīng)之前休眠 5 s。
use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --snip-- fn handle_connection(mut stream: TcpStream) { // --snip-- let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --snip-- }
新增了一種對(duì) /sleep 請(qǐng)求的響應(yīng),當(dāng)接收到該請(qǐng)求時(shí),服務(wù)器將在呈現(xiàn) hello.html 之前休眠 5 s。
使用 cargo run 啟動(dòng)服務(wù)器。然后打開兩個(gè)瀏覽器窗口:一個(gè)用于 127.0.0.1:7878,另一個(gè)用于 127.0.0.1:7878/sleep。如果像以前一樣多次輸入 / URI,您將看到它快速響應(yīng)。但是如果你輸入 /sleep,然后加載 /,你會(huì)看到 / 等待,直到 sleep 了整整 5 s 才加載。
我們要實(shí)現(xiàn)一個(gè)線程池,避免慢速請(qǐng)求后面的請(qǐng)求等待。
使用線程池提高吞吐量
線程池是一組正在等待并準(zhǔn)備處理任務(wù)的派生線程。當(dāng)程序接收到一個(gè)新任務(wù)時(shí),它將池中的一個(gè)線程分配給該任務(wù),該線程將處理該任務(wù)。池中的剩余線程可用于處理在第一個(gè)線程正在處理時(shí)進(jìn)入的任何其他任務(wù)。當(dāng)?shù)谝粋€(gè)線程完成其任務(wù)的處理后,它將返回到空閑線程池,準(zhǔn)備處理新任務(wù)。線程池允許您并發(fā)地處理連接,從而提高服務(wù)器的吞吐量。
我們將限制池中的線程數(shù)量,因?yàn)榉?wù)器的資源是有限的,也保護(hù)我們免受 DoS 攻擊。進(jìn)入的請(qǐng)求被發(fā)送到池中進(jìn)行處理,線程池將維護(hù)一個(gè)傳入請(qǐng)求隊(duì)列,池中的每個(gè)線程將從這個(gè)隊(duì)列中彈出一個(gè)請(qǐng)求,處理該請(qǐng)求,然后向隊(duì)列請(qǐng)求另一個(gè)請(qǐng)求。使用這種設(shè)計(jì),我們最多可以并發(fā)處理 N 個(gè)請(qǐng)求,其中 N 是線程數(shù)。
這種技術(shù)只是提高 Web 服務(wù)器吞吐量的眾多方法之一。其他選項(xiàng)包括 fork/join 模型、單線程異步 I/O 模型和多線程異步 I/O 模型,等等。
初步嘗試:為每個(gè)請(qǐng)求生成一個(gè)線程
首先,讓我們探索一下,如果為每個(gè)連接創(chuàng)建一個(gè)新線程,我們的代碼會(huì)是什么樣子。正如前面提到的,這不是我們的最終計(jì)劃,因?yàn)榭赡軙?huì)產(chǎn)生無限數(shù)量的線程,但這是一個(gè)起點(diǎn),可以首先獲得一個(gè)工作的多線程服務(wù)器。然后我們將添加線程池作為改進(jìn),并且比較兩種解決方案會(huì)更容易。
在單線程 Web 服務(wù)器的 main 函數(shù)中進(jìn)行修改:
use std::thread; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } }
thread::spawn 將創(chuàng)建一個(gè)新線程,然后在新線程中運(yùn)行閉包中的代碼。
如果運(yùn)行這段代碼并在瀏覽器中加載 /sleep,然后在另外兩個(gè)瀏覽器選項(xiàng)卡中加載 /,對(duì) / 的請(qǐng)求不必等待 /sleep 完成。然而,正如我們所提到的,這最終將使系統(tǒng)不堪重負(fù),因?yàn)槟銓o限制地創(chuàng)建新線程。
現(xiàn)在,是時(shí)候讓 async 和 await 真正發(fā)揮作用了!
實(shí)現(xiàn)線程池的定義和函數(shù)聲明
我們的線程池的實(shí)現(xiàn)將獨(dú)立于我們的 Web 服務(wù)器正在做的工作。
創(chuàng)建一個(gè)src/lib.rs,先實(shí)現(xiàn)一個(gè) ThreadPool 結(jié)構(gòu)體的定義,以及 ThreadPool::new 函數(shù),其參數(shù) size 表示線程池內(nèi)線程的最大數(shù)量。
pub struct ThreadPool; impl ThreadPool { pub fn new(size: usize) -> ThreadPool { ThreadPool } }
我們將實(shí)現(xiàn) execute 函數(shù),它接受給定的閉包,并將其交給池中的空閑線程運(yùn)行。該函數(shù)類似于標(biāo)準(zhǔn)庫(kù) thread::spawn 函數(shù)。
我們可以將閉包作為具有三個(gè)不同特征的參數(shù):Fn、FnMut 和 FnOnce。我們需要決定在這里使用哪種閉包。我們可以看看 thread::spawn 的簽名對(duì)它的參數(shù)有什么限制。文檔向我們展示了以下內(nèi)容:
impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } }
F 類型參數(shù)是我們關(guān)心的,T 類型參數(shù)與返回值有關(guān),我們不關(guān)心這個(gè)。我們可以看到 spawn 使用 FnOnce 作為 f 上的 trait 約束。因?yàn)槲覀冏罱K將在 execute 中獲得的參數(shù)傳遞給 spawn,并且運(yùn)行請(qǐng)求的線程只會(huì)執(zhí)行該請(qǐng)求的閉包一次,所以 FnOnce 是我們想要使用的 trait。
F 類型參數(shù)也有 Send trait 約束和 static 生命周期約束,這在我們的情況下很有用:我們需要 Send 來將閉包從一個(gè)線程轉(zhuǎn)移到另一個(gè)線程,而需要 static 是因?yàn)槲覀儾恢谰€程執(zhí)行需要多長(zhǎng)時(shí)間。讓我們?cè)?ThreadPool 上創(chuàng)建一個(gè)execute方法,它將接受 F 類型的泛型參數(shù),并具有以下約束:
impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } }
我們?nèi)匀辉?FnOnce 之后使用 (),因?yàn)檫@個(gè) FnOnce 表示一個(gè)閉包,它不接受參數(shù),返回單元類型 ()。就像函數(shù)定義一樣,返回類型可以從簽名中省略,但即使沒有參數(shù),仍然需要括號(hào)。
ThreadPool 結(jié)構(gòu)體的定義和兩個(gè)函數(shù)的聲明已經(jīng)完成,使用 ThreadPool 結(jié)構(gòu)體代替 thread::spawn 的假設(shè)接口。
修改 main.rs 中的代碼:
use multi_thread_web_server::ThreadPool; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } }
我們使用 ThreadPool::new 創(chuàng)建一個(gè)新的線程池,可配置的線程數(shù)為 4 個(gè)。然后,在 for 循環(huán)中,pool.execute 有一個(gè)類似 thread::spawn 的接口,因?yàn)樗邮芤粋€(gè)閉包,處理每一個(gè) stream。
運(yùn)行 cargo build,編譯通過了。
驗(yàn)證 new 中的線程數(shù)
前面我們?yōu)?size 參數(shù)選擇了 unsigned 類型,因?yàn)榫€程數(shù)為負(fù)數(shù)的池沒有意義。然而,一個(gè)沒有線程的池也沒有意義,所以在返回 ThreadPool 實(shí)例之前,我們將添加代碼來檢查 size 是否大于 0,并通過 assert 讓程序在接收到 0 時(shí) panic。
impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); ThreadPool } // --snip-- }
我們還為 ThreadPool 添加了一些文檔和文檔注釋。運(yùn)行 cargo doc --open,在打開的 HTML 文檔中點(diǎn)擊 ThreadPool 就能查看它的一些介紹信息。
我們也可以將 new 更改為 build 并返回一個(gè) Result,就像下面的定義一樣:
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
但是在這種情況下,我們嘗試創(chuàng)建一個(gè)沒有任何線程的線程池是不合理的,我們希望在錯(cuò)誤時(shí) panic。
創(chuàng)建存儲(chǔ)線程的空間
既然我們有辦法知道池中存儲(chǔ)了有效數(shù)量的線程,我們就可以創(chuàng)建這些線程,并在返回結(jié)構(gòu)體之前將它們存儲(chǔ)在 ThreadPool 結(jié)構(gòu)體中。
但是我們?nèi)绾?ldquo;存儲(chǔ)”一個(gè)線程呢?讓我們?cè)倏匆幌?thread::spawn 簽名:
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static,
spawn 函數(shù)返回一個(gè) JoinHandle<T>,其中 T 是閉包返回的類型。我們也使用 JoinHandle,因?yàn)槲覀儌鬟f給線程池的閉包將處理連接而不返回任何東西,因此 T 將是單元類型 ()。
修改 ThreadPool 的定義,使其包含 thread::JoinHandle<()> 實(shí)例的 vector。
use std::thread; pub struct ThreadPool { threads: Vec<thread::JoinHandle<()>>, }
再修改 new 函數(shù),初始化 vector 的容量為 size,設(shè)置 for 循環(huán),運(yùn)行一些代碼來創(chuàng)建線程,并返回一個(gè)包含它們的 ThreadPool 實(shí)例。
pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut threads = Vec::with_capacity(size); for _ in 0..size { // create some threads and store them in the vector } ThreadPool { threads } }
再次運(yùn)行 cargo build,編譯成功。
負(fù)責(zé)將代碼從線程池發(fā)送到線程的 Worker 結(jié)構(gòu)體
標(biāo)準(zhǔn)庫(kù)的 thread::spawn 期望得到一些代碼,這些代碼應(yīng)該在線程創(chuàng)建后立即運(yùn)行。然而,在本例中,我們希望創(chuàng)建線程并讓它們等待稍后發(fā)送的代碼。
我們將通過在 ThreadPool 和管理這種新行為的線程之間引入一個(gè)新的數(shù)據(jù)結(jié)構(gòu)來實(shí)現(xiàn)這種行為。我們將此數(shù)據(jù)結(jié)構(gòu)稱為 Worker,這是池實(shí)現(xiàn)中的一個(gè)常用術(shù)語(yǔ)。Worker 獲取需要運(yùn)行的代碼,并在 Worker 的線程中運(yùn)行這些代碼。
我們將存儲(chǔ) Worker 結(jié)構(gòu)的實(shí)例,而不是在線程池中存儲(chǔ) JoinHandle<()> 實(shí)例的 vector。每個(gè) Worker 將存儲(chǔ)一個(gè) JoinHandle<()> 實(shí)例。然后,我們將在 Worker 上實(shí)現(xiàn)一個(gè)方法,該方法將接受代碼的閉包來運(yùn)行,并將其發(fā)送到已經(jīng)運(yùn)行的線程中執(zhí)行。我們還將為每個(gè) Worker 提供一個(gè) id,以便在進(jìn)行日志記錄或調(diào)試時(shí)區(qū)分池中 Worker 的不同實(shí)例。
總結(jié)一下,我們要實(shí)現(xiàn)這四件事:
- 定義一個(gè) Worker 結(jié)構(gòu)體,它包含一個(gè) id 和一個(gè) JoinHandle<()>.
- 更改 ThreadPool 的定義,包含一個(gè)Worker 實(shí)例的 vector,而不是 Vec<thread::JoinHandle<()>>。
- 定義一個(gè) Worker::new 函數(shù),該函數(shù)接受一個(gè) id 號(hào),并返回一個(gè)包含該 id 的 Worker 實(shí)例和一個(gè)由空閉包派生的線程。
- 在 ThreadPool::new 函數(shù)中,使用 for 循環(huán)計(jì)數(shù)器生成一個(gè) id,用該 id 創(chuàng)建一個(gè)新的 Worker,并將該 Worker 存儲(chǔ)在 vector 中。
use std::thread; pub struct ThreadPool { workers: Vec<Worker>, } impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers } } // --snip-- } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize) -> Worker { let thread = thread::spawn(|| {}); Worker { id, thread } } }
外部代碼不需要知道在 ThreadPool 中使用 Worke r結(jié)構(gòu)體的實(shí)現(xiàn)細(xì)節(jié),所以我們把 Worker 結(jié)構(gòu)體及其函數(shù)設(shè)為 private。Worker::new 函數(shù)使用我們給它的 id,并存儲(chǔ)一個(gè) JoinHandle<()> 實(shí)例,該實(shí)例是通過使用空閉包生成一個(gè)新線程創(chuàng)建的。
我們將 ThreadPool 上的字段名稱從 threads 更改為 workers,因?yàn)樗F(xiàn)在保存 Worker 實(shí)例而不是 JoinHandle<()> 實(shí)例。
注意,如果操作系統(tǒng)因?yàn)闆]有足夠的系統(tǒng)資源而無法創(chuàng)建線程,thread::spawn 將出現(xiàn) panic,這在實(shí)際生產(chǎn)環(huán)境中很危險(xiǎn)。實(shí)際情況下,我們可以使用 std::thread::Builder 及其派生方法。
這段代碼將編譯并存儲(chǔ)作為 ThreadPool::new 參數(shù)指定的 Worker 實(shí)例的數(shù)量。但是我們?nèi)匀粵]有處理在 execute 中得到的閉包。讓我們看看接下來該怎么做。
通過通道向線程發(fā)送請(qǐng)求
我們希望剛剛創(chuàng)建的 Worker 結(jié)構(gòu)體從 ThreadPool 中保存的隊(duì)列中獲取要運(yùn)行的代碼,并將該代碼發(fā)送到其線程中運(yùn)行。
我們將使用通道作為作業(yè)隊(duì)列,execute 將把作業(yè)從 ThreadPool 發(fā)送到 Worker 實(shí)例,后者將把作業(yè)發(fā)送到它的線程。計(jì)劃如下:
- ThreadPool 將創(chuàng)建一個(gè)通道并保持發(fā)送端。
- 每個(gè) Worker 獲取 receiver,作為接收端。
- 我們將創(chuàng)建一個(gè)新的 Job 結(jié)構(gòu)體來保存我們想要發(fā)送到通道中的閉包。
- execute 方法將通過發(fā)送端發(fā)送它想要執(zhí)行的作業(yè)。
- 在它的線程中,Worker 將遍歷它的接收者,并執(zhí)行它接收到的所有作業(yè)的閉包。
讓我們首先在 ThreadPool::new 中創(chuàng)建一個(gè)通道,并在 ThreadPool 實(shí)例中保存發(fā)送端。Job 結(jié)構(gòu)現(xiàn)在還沒有保存任何東西,但它將是我們發(fā)送到通道的項(xiàng)的類型。
use std::{sync::mpsc, thread}; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job; impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers, sender } } // --snip-- }
在 ThreadPool::new 中,我們創(chuàng)建了一個(gè)通道,并讓 ThreadPool 包含 sender。這將成功編譯。
讓我們嘗試在線程池創(chuàng)建通道時(shí)將通道的接收器傳遞給每個(gè) Worker。我們知道我們想要在 Worker 實(shí)例產(chǎn)生的線程中使用 receiver,所以我們將在閉包中引用 receiver 參數(shù)。
impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, receiver)); } ThreadPool { workers, sender } } // --snip-- } // --snip-- impl Worker { fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker { let thread = thread::spawn(|| { receiver; }); Worker { id, thread } } }
代碼試圖將 receiver 傳遞給多個(gè) Worker 實(shí)例,這是行不通的,因?yàn)?Rust 提供的通道實(shí)現(xiàn)是多個(gè)生產(chǎn)者,單個(gè)消費(fèi)者。這意味著我們不能僅僅克隆通道的消費(fèi)端(接收端)來修復(fù)此代碼。我們也不想多次向多個(gè)消費(fèi)者發(fā)送消息。我們想要一個(gè)包含多個(gè) Worker 實(shí)例的消息列表,這樣每個(gè)消息都會(huì)被處理一次。
此外,從通道隊(duì)列中取出作業(yè)涉及到改變 receiver,因此線程需要一種安全的方式來共享和修改 receiver。
為了在多個(gè)線程之間共享所有權(quán)并允許線程改變值,我們需要使用 Arc<Mutex<T>> 。Arc 類型將允許多個(gè) Worker 實(shí)例擁有 receiver,Mutex 將確保一次只有一個(gè) Worker 從接收器獲得作業(yè)。
use std::{ sync::{Arc, Mutex, mpsc}, thread, }; // --snip-- impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } // --snip-- } // --snip-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { // --snip-- } }
在 ThreadPool::new 中,我們將接收者置于 Arc<Mutex<>> 中。對(duì)于每個(gè)新的 Worker,我們克隆 Arc 來增加引用計(jì)數(shù),這樣 Worker 實(shí)例就可以共享 receiver 的所有權(quán)。
有了這些修改,代碼就可以編譯了。
實(shí)現(xiàn) execute 方法
最后讓我們實(shí)現(xiàn) ThreadPool::execute 方法。我們還將 Job 從結(jié)構(gòu)體更改為 trait 對(duì)象的類型別名,該 trait 對(duì)象保存 execute 接收的閉包類型。
// --snip-- type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } }
在使用獲得的閉包創(chuàng)建新 Job 實(shí)例之后,我們將該作業(yè)發(fā)送到通道中。在 send 失敗的情況下我們調(diào)用 unwrap。
如果我們停止執(zhí)行所有線程,這意味著接收端已經(jīng)停止接收新消息,就可能發(fā)生這種情況。目前,我們不能停止線程的執(zhí)行:只要池存在,線程就會(huì)繼續(xù)執(zhí)行。我們使用 unwrap 的原因是我們知道失敗情況不會(huì)發(fā)生,但是編譯器不知道。
但我們還沒有完全完成!在 Worker 中,傳遞給 thread::spawn 的閉包仍然只引用通道的接收端。相反,我們需要閉包永遠(yuǎn)循環(huán),向通道的接收端請(qǐng)求作業(yè),并在獲得作業(yè)時(shí)運(yùn)行作業(yè)。讓我們對(duì) Worker::new 函數(shù)進(jìn)行如下的更改。
// --snip-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); } }); Worker { id, thread } } }
在這里,我們首先在 receiver 上調(diào)用 lock 來獲取互斥鎖,然后調(diào)用 unwrap 來在出現(xiàn)錯(cuò)誤時(shí)發(fā)出警報(bào)。
如果互斥鎖處于鎖定狀態(tài),獲取鎖可能會(huì)失敗,如果其他線程在持有鎖而不是釋放鎖時(shí) panic,就會(huì)發(fā)生這種情況。在這種情況下,調(diào)用 unwrap 使該線程 panic 是正確的操作。你也可以將此 unwrap 更改為 expect,并顯示有意義的錯(cuò)誤消息。
如果我們獲得了互斥鎖,我們調(diào)用 recv 從通道接收 Job。如果持有發(fā)送方的線程已經(jīng)關(guān)閉,那么最后的 unwrap 也會(huì)跳過這里的任何錯(cuò)誤,類似于如果接收方關(guān)閉,則 send 方法返回 Err。
對(duì) recv 的調(diào)用會(huì)阻塞當(dāng)前線程,直到有作業(yè)可用。Mutex<T> 確保一次只有一個(gè) Worker 線程試圖請(qǐng)求作業(yè)。
至此,多線程 Web 服務(wù)器已經(jīng)能成功運(yùn)行了?,F(xiàn)在我們有了一個(gè)異步執(zhí)行連接的線程池。創(chuàng)建的線程永遠(yuǎn)不會(huì)超過 4 個(gè),所以如果服務(wù)器接收到大量請(qǐng)求,我們的系統(tǒng)也不會(huì)過載,但也不會(huì)停止。在瀏覽器打開多個(gè)網(wǎng)頁(yè),程序輸出一些執(zhí)行的信息:
Worker 0 got a job; executing. Worker 1 got a job; executing. Worker 2 got a job; executing. Worker 3 got a job; executing. Worker 0 got a job; executing. Worker 2 got a job; executing. Worker 1 got a job; executing. ...
你可能想知道為什么不按照下面所示的方式編寫工作線程代碼。
// --snip-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {id} got a job; executing."); job(); } }); Worker { id, thread } } }
這段代碼可以編譯和運(yùn)行,但不會(huì)產(chǎn)生期望的線程行為:緩慢的請(qǐng)求仍然會(huì)導(dǎo)致其他請(qǐng)求等待處理。原因有些微妙:Mutex 沒有公共解鎖方法,因?yàn)殒i的所有權(quán)是基于鎖方法返回的 LockResult<MutexGuard<T>> 中的 MutexGuard<T> 的生命周期。在編譯時(shí),借用檢查器可以強(qiáng)制執(zhí)行由互斥鎖保護(hù)的資源不能被訪問的規(guī)則,除非我們持有該鎖。但是,如果我們不注意 MutexGuard<T>的生命周期,這種實(shí)現(xiàn)也會(huì)導(dǎo)致鎖被持有的時(shí)間比預(yù)期的要長(zhǎng)。
之前的代碼使用 let job = receiver.lock().unwrap().recv().unwrap();
之所以有效,是因?yàn)槭褂?let 時(shí),在等號(hào)右側(cè)的表達(dá)式中使用的任何臨時(shí)值都會(huì)在 let 語(yǔ)句結(jié)束時(shí)立即刪除。然而,while let(以及 if let 和 match)在相關(guān)塊結(jié)束之前不會(huì)刪除臨時(shí)值。在使用 while let 的代碼中,鎖在調(diào)用 job() 期間保持持有,這意味著其他 Worker 實(shí)例不能接收作業(yè)。
正常關(guān)機(jī)和清理
接下來,我們將實(shí)現(xiàn) Drop trait,在池中的每個(gè)線程上調(diào)用 join,這樣它們就可以在關(guān)閉之前完成正在處理的請(qǐng)求。然后我們將實(shí)現(xiàn)一種方法來告訴線程它們應(yīng)該停止接受新請(qǐng)求并關(guān)閉。要查看這段代碼的實(shí)際效果,我們將修改服務(wù)器,使其在優(yōu)雅地關(guān)閉線程池之前只接受兩個(gè)請(qǐng)求。
在 ThreadPool 上實(shí)現(xiàn)D rop trait
讓我們從在線程池上實(shí)現(xiàn) Drop 開始。當(dāng)池被刪除時(shí),我們的線程都應(yīng)該連接起來,以確保它們完成自己的工作。
impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } }
我們循環(huán)遍歷線程池的每個(gè) worker。我們使用 &mut 是因?yàn)?self 是一個(gè)可變引用,而且我們還需要能夠改變 worker。對(duì)于每個(gè) worker,我們打印一條消息,表示這個(gè)特定的 worker 實(shí)例正在關(guān)閉,然后我們?cè)谠?worker 實(shí)例的線程上調(diào)用 join。如果 join 調(diào)用失敗,我們使用 unwrap 使 Rust 陷入 panic,并進(jìn)入不正常的關(guān)閉狀態(tài)。
然而,程序并不能成功編譯:
這個(gè)錯(cuò)誤告訴我們不能調(diào)用 join,因?yàn)槲覀冎挥忻總€(gè) worker 的可變借用,join 擁有其參數(shù)的所有權(quán)。為了解決這個(gè)問題,我們需要將線程移出擁有線程的 Worker 實(shí)例,以便 join 可以使用線程。
一種解決方法是使用 Option。如果 Worker 持有 Option<thread::JoinHandle<()>>,我們可以調(diào)用 Optio n的 take 方法將值從 Some 變體中移出,并在其位置留下 None 變體。換句話說,正在運(yùn)行的 Worker 在線程中會(huì)有一個(gè) Some 變量,當(dāng)我們想要清理 Worker 時(shí),我們將 Some 替換為 None,這樣 Worker 就不會(huì)有線程要運(yùn)行了。
然而,只有在丟棄 Worker 時(shí)才會(huì)出現(xiàn)這種情況。使用 Option 之后,我們必須在訪問 worker.thread 的任何地方處理 Option<thread::JoinHandle<()>>,這很繁瑣。
在這種情況下,存在一個(gè)更好的替代方法:Vec::drain 方法。它接受一個(gè) range 參數(shù)來指定要從Vec中刪除哪些項(xiàng),并返回這些項(xiàng)的迭代器。傳遞 .. 將從 Vec 中刪除所有值。
所以我們需要像這樣更新 ThreadPool 的 Drop 實(shí)現(xiàn):
impl Drop for ThreadPool { fn drop(&mut self) { for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } }
這將解決編譯器錯(cuò)誤,并且不需要對(duì)代碼進(jìn)行任何其他更改。
向線程發(fā)出停止監(jiān)聽作業(yè)的信號(hào)
程序還沒有按照我們想要的方式運(yùn)行。關(guān)鍵是由 Worker 實(shí)例的線程運(yùn)行的閉包中的邏輯:目前,我們調(diào)用 join,但這不會(huì)關(guān)閉線程,因?yàn)樗鼈冇肋h(yuǎn)在循環(huán)尋找作業(yè)。如果我們嘗試使用當(dāng)前的 Drop 實(shí)現(xiàn)來刪除 ThreadPool,主線程將永遠(yuǎn)阻塞,等待第一個(gè)線程完成。
為了解決這個(gè)問題,我們需要改變 ThreadPool Drop 的實(shí)現(xiàn),在等待線程完成之前顯式地刪除 sender。然后再改變 Worker 中的 loop。
pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, } // --snip-- impl ThreadPool { pub fn new(size: usize) -> ThreadPool { // --snip-- ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.as_ref().unwrap().send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take()); for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } }
與線程不同,這里我們需要使用 Option::take 來將 sender 移出 ThreadPool。
刪除 sender 將關(guān)閉通道,這表明將不再發(fā)送消息。當(dāng)這種情況發(fā)生時(shí), Worker 實(shí)例在 loop 中對(duì) recv 的所有調(diào)用都會(huì)返回一個(gè)錯(cuò)誤。在這種情況下,我們應(yīng)該優(yōu)雅地退出循環(huán),這意味著線程將在 ThreadPool Drop 實(shí)現(xiàn)調(diào)用 join 時(shí)結(jié)束。
impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let message = receiver.lock().unwrap().recv(); match message { Ok(job) => { println!("Worker {id} got a job; executing."); job(); } Err(_) => { println!("Worker {id} disconnected; shutting down."); break; } } } }); Worker { id, thread } } }
要查看這段代碼的實(shí)際效果,讓我們修改 main 函數(shù),使其在優(yōu)雅地關(guān)閉服務(wù)器之前只接受兩個(gè)請(qǐng)求。
fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } println!("Shutting down."); }
take 方法是在 Iterator trait 中定義的,它將迭代最多限制在前兩項(xiàng)。 ThreadPool 將在 main 函數(shù)結(jié)束時(shí)超出作用域,并運(yùn)行 Drop實(shí)現(xiàn)。
啟動(dòng)裝載運(yùn)行的服務(wù)器,并發(fā)出三個(gè)請(qǐng)求。第三個(gè)請(qǐng)求應(yīng)該出錯(cuò),程序輸出為:
Worker 0 got a job; executing. Shutting down. Worker 1 got a job; executing. Shutting down worker 0 Worker 3 disconnected; shutting down. Worker 2 disconnected; shutting down. Worker 0 disconnected; shutting down. Shutting down worker 1 Worker 1 disconnected; shutting down. Shutting down worker 2 Shutting down worker 3
打印的 Worker id 和消息可能有不同順序。我們可以從消息中看到這段代碼是如何工作的:Worker 實(shí)例 0 和 1 獲得了前兩個(gè)請(qǐng)求。服務(wù)器在第二個(gè)連接之后停止接受連接,線程池上的 Drop 實(shí)現(xiàn)甚至在 Worker 1 開始它的工作之前就開始執(zhí)行。刪除發(fā)送器將斷開所有 Worker 實(shí)例的連接,并告訴它們關(guān)閉。每個(gè) Worker 實(shí)例在斷開連接時(shí)打印一條消息,然后線程池調(diào)用 join 來等待每個(gè) Worker 線程完成。
注意這個(gè)特殊執(zhí)行的一個(gè)有趣的方面:ThreadPool 丟棄了 sender,并且在任何 Worker 接收到錯(cuò)誤之前,我們嘗試加入 Worker 0。工作線程 0 還沒有從 recv 獲得錯(cuò)誤,所以主線程阻塞等待工作線程 0 完成。同時(shí),Worker 1 收到了一個(gè)作業(yè),然后所有線程都收到了一個(gè)錯(cuò)誤。當(dāng) Worker 0 完成時(shí),主線程等待其余的 Worker 實(shí)例完成。在這一點(diǎn)上,他們都退出了循環(huán),停止了。
我們現(xiàn)在已經(jīng)完成了我們的項(xiàng)目;我們有一個(gè)基本的 Web 服務(wù)器,它使用線程池進(jìn)行異步響應(yīng)。我們能夠執(zhí)行服務(wù)器的優(yōu)雅關(guān)閉,這將清理池中的所有線程。
項(xiàng)目地址
GitHub:UestcXiye / Multi-Thread-Web-Server-based-on-Rust
到此這篇關(guān)于Rust中多線程 Web 服務(wù)器的項(xiàng)目實(shí)戰(zhàn)的文章就介紹到這了,更多相關(guān)Rust 多線程Web服務(wù)器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Rust中的Box<T>之堆上的數(shù)據(jù)與遞歸類型詳解
本文介紹了Rust中的Box<T>類型,包括其在堆與棧之間的內(nèi)存分配,性能優(yōu)勢(shì),以及如何利用Box<T>來實(shí)現(xiàn)遞歸類型和處理大小未知類型,通過Box<T>,Rust程序員可以更靈活地管理內(nèi)存,避免編譯時(shí)大小不確定的問題,并提高代碼的效率和靈活性2025-02-02Rust中類型轉(zhuǎn)換在錯(cuò)誤處理中的應(yīng)用小結(jié)
隨著項(xiàng)目的進(jìn)展,關(guān)于Rust的故事又翻開了新的一頁(yè),今天來到了服務(wù)器端的開發(fā)場(chǎng)景,發(fā)現(xiàn)錯(cuò)誤處理中的錯(cuò)誤類型轉(zhuǎn)換有必要分享一下,對(duì)Rust錯(cuò)誤處理相關(guān)知識(shí)感興趣的朋友一起看看吧2023-09-09如何使用bindgen將C語(yǔ)言頭文件轉(zhuǎn)換為Rust接口代碼
這篇文章主要介紹了使用bindgen將C語(yǔ)言頭文件轉(zhuǎn)換為Rust接口代碼,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-01-01Rust?語(yǔ)言中符號(hào)?::?的使用場(chǎng)景解析
Rust?是一種強(qiáng)調(diào)安全性和速度的系統(tǒng)編程語(yǔ)言,這篇文章主要介紹了Rust?語(yǔ)言中符號(hào)?::?的使用場(chǎng)景,本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2024-03-03Rust-使用dotenvy加載和使用環(huán)境變量的過程詳解
系統(tǒng)的開發(fā),測(cè)試和部署離不開環(huán)境變量,今天分享在Rust的系統(tǒng)開發(fā)中,使用dotenvy來讀取和使用環(huán)境變量,感興趣的朋友跟隨小編一起看看吧2023-11-11