Rust并發(fā)編程之使用消息傳遞進(jìn)行線程間數(shù)據(jù)共享方式
一、通道(Channel)的基本概念
一個(gè)通道可以想象成一條單向水道或河流:有一個(gè) 發(fā)送端(transmitter) 和一個(gè) 接收端(receiver)。發(fā)送端好比河流上游,負(fù)責(zé)把“橡皮鴨”丟進(jìn)水里;接收端在河流下游,收到這只“橡皮鴨”。在編程中,線程之間的通信即是這樣——把數(shù)據(jù)發(fā)到通道的一端,另外一個(gè)(或多個(gè))線程在通道的另一端接收。
Rust 通過(guò) std::sync::mpsc
(Multiple Producer, Single Consumer)來(lái)提供通道功能:
- Multiple Producer:可以有多個(gè)發(fā)送端同時(shí)發(fā)送數(shù)據(jù);
- Single Consumer:但只能有一個(gè)接收端來(lái)接收數(shù)據(jù)。
通過(guò)克隆發(fā)送端可以允許多個(gè)線程一起發(fā)送數(shù)據(jù)給同一個(gè)接收端。
二、創(chuàng)建并使用通道
1. 基礎(chǔ)用法
創(chuàng)建一個(gè) mpsc::channel
use std::sync::mpsc; use std::thread; fn main() { // 創(chuàng)建一個(gè)通道 let (tx, rx) = mpsc::channel(); // 這里的 tx 是 transmitter(發(fā)送端),rx 是 receiver(接收端)。 // 我們先不發(fā)送任何數(shù)據(jù),因此代碼暫時(shí)無(wú)法編譯, // 因?yàn)榫幾g器不知道通道要發(fā)送什么類型的數(shù)據(jù)。 }
mpsc::channel()
函數(shù)會(huì)返回一個(gè)元組 (tx, rx)
,分別代表發(fā)送端和接收端。在之后,我們會(huì)看到如何把 tx
移動(dòng)到不同線程去發(fā)送消息,rx
則留在當(dāng)前線程用于接收消息。
2. 在子線程中發(fā)送消息
下面的例子中,我們?cè)谥骶€程創(chuàng)建了通道,然后把發(fā)送端 tx
移動(dòng)(move
)到子線程中,子線程通過(guò) tx.send()
發(fā)送一條字符串“hi”給主線程。
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); // 如果接收端已關(guān)閉,則 send 會(huì)返回錯(cuò)誤,這里直接 unwrap 處理 }); // 在主線程接收消息 let received = rx.recv().unwrap(); // recv 會(huì)阻塞主線程,直到收到一條消息或者發(fā)送端被關(guān)閉 println!("Got: {}", received); }
運(yùn)行后,主線程會(huì)打?。?/p>
Got: hi
這表示主線程成功地收到了子線程通過(guò)通道發(fā)送的字符串。
3. 通道與所有權(quán)
當(dāng)我們調(diào)用 tx.send(val)
時(shí),send
方法會(huì)獲取 val
的所有權(quán)。這樣做能夠避免在另一個(gè)線程修改數(shù)據(jù)后,我們?cè)谠€程又使用這段數(shù)據(jù)的潛在風(fēng)險(xiǎn)。
例如,下面這段示例代碼(示意)試圖在發(fā)送之后繼續(xù)使用 val
,就會(huì)導(dǎo)致編譯錯(cuò)誤:
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); let val = String::from("hello"); thread::spawn(move || { tx.send(val).unwrap(); // 發(fā)送后 val 的所有權(quán)已轉(zhuǎn)移到通道 // println!("val is: {}", val); // 編譯錯(cuò)誤: val 所有權(quán)已經(jīng)被移動(dòng) }); let received = rx.recv().unwrap(); println!("Got: {}", received); }
由于所有權(quán)已經(jīng)轉(zhuǎn)移,主線程可以安全地接收并處理這條消息,而子線程也不會(huì)再訪問(wèn)已經(jīng)移出的數(shù)據(jù)。這種嚴(yán)格的所有權(quán)規(guī)則能有效避免數(shù)據(jù)競(jìng)爭(zhēng)和其他并發(fā)錯(cuò)誤。
4. 發(fā)送多個(gè)消息
我們可以讓子線程發(fā)送不止一條消息。下面的例子讓子線程依次發(fā)送多條字符串,并在發(fā)送之間加上 sleep
用來(lái)模擬耗時(shí)操作:
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); // 主線程中,通過(guò) iter 的方式持續(xù)接收消息 for received in rx { println!("Got: {}", received); } }
由于接收端可以被當(dāng)做迭代器來(lái)使用,當(dāng)所有發(fā)送端都關(guān)閉時(shí),for
循環(huán)會(huì)自動(dòng)結(jié)束。這段程序會(huì)像下面這樣依次打印每條消息:
Got: hi
Got: from
Got: the
Got: thread
5. 多個(gè)發(fā)送端(Multiple Producer)
mpsc
的含義之一就是 Multiple Producer。如果我們希望有多個(gè)不同的子線程來(lái)發(fā)送消息給同一個(gè)接收端,只需要克隆發(fā)送端即可。如下:
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); let tx1 = tx.clone(); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("first thread"), ]; for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); // 這里使用原先的 tx thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("Got: {}", received); } }
- 第一個(gè)子線程使用
tx1
; - 第二個(gè)子線程使用原本的
tx
;
所有發(fā)送過(guò)來(lái)的數(shù)據(jù)都將流向同一個(gè) rx
(接收端)。運(yùn)行結(jié)果每次可能都不一樣,因?yàn)榫€程的調(diào)度順序不可預(yù)測(cè),這也正是并發(fā)編程“有趣”且需要謹(jǐn)慎之處。
三、總結(jié)
- 創(chuàng)建通道:使用
mpsc::channel()
創(chuàng)建通道,獲得(tx, rx)
(發(fā)送端和接收端)。 - 發(fā)送數(shù)據(jù):
tx.send(data)
會(huì)轉(zhuǎn)移data
的所有權(quán),若接收端已關(guān)閉,send
會(huì)返回錯(cuò)誤。 - 接收數(shù)據(jù):
rx.recv()
會(huì)阻塞等待數(shù)據(jù);rx.try_recv()
則不會(huì)阻塞,可用于非阻塞檢查。也可將rx
當(dāng)做迭代器使用,以便持續(xù)接收數(shù)據(jù),直到通道被關(guān)閉。 - 所有權(quán)規(guī)則保障安全:發(fā)送端在
send
時(shí)會(huì)移動(dòng)數(shù)據(jù)的所有權(quán),避免了多線程中對(duì)同一數(shù)據(jù)的潛在不安全訪問(wèn)。 - 多發(fā)送端:通過(guò)克隆發(fā)送端(
tx.clone()
),多個(gè)線程可以各自發(fā)送數(shù)據(jù)到同一個(gè)接收端,從而實(shí)現(xiàn)復(fù)雜的多生產(chǎn)者單消費(fèi)者架構(gòu)。
Rust 的通道借助所有權(quán)系統(tǒng),幫助我們輕松規(guī)避了許多并發(fā)陷阱。通過(guò)消息傳遞的思路,不再需要小心翼翼地管理鎖和共享數(shù)據(jù),編程思路也往往更加清晰簡(jiǎn)潔。在實(shí)際項(xiàng)目中,若需要多個(gè)線程之間相互通信,不妨考慮一下通道(channel)方案,也許能帶來(lái)更加優(yōu)雅和可靠的并發(fā)架構(gòu)。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Rust中的Iterator和IntoIterator介紹及應(yīng)用小結(jié)
Iterator即迭代器,它可以用于對(duì)數(shù)據(jù)結(jié)構(gòu)進(jìn)行迭代,被迭代的數(shù)據(jù)結(jié)構(gòu)是可迭代的(iterable),所謂的可迭代就是這個(gè)數(shù)據(jù)結(jié)構(gòu)有返回迭代器的方法,這篇文章主要介紹了Rust中的Iterator和IntoIterator介紹及應(yīng)用,需要的朋友可以參考下2023-07-07關(guān)于Rust編譯時(shí)報(bào)link.exe?not?found錯(cuò)誤問(wèn)題
這篇文章主要介紹了Rust編譯的時(shí)候報(bào)出link.exe?not?found錯(cuò)誤問(wèn)題,解決方法是在命令行就是CMD執(zhí)行相應(yīng)的命令即可,本文給大家分解決方法,需要的朋友可以參考下2022-09-09