使用?Rust?實現(xiàn)的基礎的List?和?Watch?機制示例流程
使用 Rust 實現(xiàn)的基礎的List 和 Watch 機制
介紹
在日常的開發(fā)過程中,有一個很重要的任務是能夠通過Rust語言實現(xiàn)K8s中的各種生態(tài)組件,在這個過程中,既需要能過夠了解K8S的工作原理也需要能夠知道rust的語言特性。因此,在這個過程中有很多值得探討的知識點。
在這里,第一步,我們將探索如何使用 Rust 實現(xiàn)一個類似于 Kubernetes 的 list 和 watch 機制。我們將通過 WebSocket 實現(xiàn)實時的消息推送,并使用一些關鍵的 Rust 異步編程模型來處理事件和連接管理。
我們首先默認大家能夠了解rust語言的基本特性。下文中,將針對rust的知識點展開進行探討。
目標
- 理解 WebSocket 連接的建立和管理。
- 學習如何通過 WebSocket 推送消息。
- 掌握消息緩存和處理的實現(xiàn)方式。
- 了解如何使用 Rust 實現(xiàn)一個高效的事件分發(fā)系統(tǒng)。
- 理解K8S中的數據一致性保障方法
- 了解本機制的不足,以及后續(xù)如何進行改進
理解問題
什么是 list 和 watch?
- List:列出當前所有資源的狀態(tài)。
- Watch:實時監(jiān)控資源的變化,一旦有資源變化,就會立即通知客戶端。
使用場景
- 自動化運維:實時監(jiān)控系統(tǒng)資源狀態(tài),觸發(fā)自動化運維操作。
- 應用監(jiān)控:實時獲取應用狀態(tài),及時處理異常,在很多的系統(tǒng)設計場景中,能夠減少耦合。
- K8S中的相應設計:K8S中,對相應資源的通知的基礎即為list and watch機制。本人在學習K8S源碼的第一步就是學習這一套設計架構。
分析問題
\當然,通過簡單的代碼僅僅通過http進行主動連接也可實現(xiàn)這個功能。但在目前階段,我們希望能夠設計一個高效的、穩(wěn)定的、可擴展的list and watch體系,因此我們需要考慮以下幾個關鍵問題。
關鍵問題
- 如何建立和管理 我們服務器和客戶端的連接?通過什么方式進行?
- 如何實現(xiàn)高效的消息推送機制?
- 如何處理消息緩存和訂閱管理?
技術選型
- 語言:Rust
- Web 框架:warp框架
- WebSocket實現(xiàn)和框架:tokio-tungstenite、warp
- 異步編程:tokio、管道機制
設計代碼結構
針對以上這個需求,結合目前kunos-system的需求我們闡釋如下
- 有以下幾個資源,Node、Task(Task是一個shell命令、鏡像運行命令的載體)、Job(Task的上層資源,一個Job包含多個Task,類似于K8s中的replicaset)我們需要對這幾個資源的狀態(tài)進行推送。
- 能夠在服務器建立起來一個watch and list服務器,能夠推送各種事件
- 能夠
組件設計
- Broker:管理 WebSocket 訂閱者和事件分發(fā)。
pub struct Broker<R: Resource + Clone + Serialize + Send + Sync + 'static> { // 下游的訂閱者列表,用于發(fā)送websocket信息 subscribers: Arc<RwLock<HashMap<Topic, HashMap<Uuid, WsSender>>>>, // 事件的緩沖流 event_sender: UnboundedSender<(Topic, WatchEvent<R>)>, } - Watcher:對不同資源類型進行管理和操作。
pub struct Watcher { // 為不同的事件建立不同的broker pub node_broker: Arc<Broker<Node>>, pub task_broker: Arc<Broker<Task>>, pub job_broker: Arc<Broker<Job>>, pub exec_broker: Arc<Broker<TaskExecRequest>>, } - WebSocket 客戶端:與服務器交互,接收實時事件。
基本原理
websocket路由入口
let node_subscribe = warp::path!("watch" / "node").and(warp::ws()).map(
move |ws: warp::ws::Ws| {
let node_broker_clone = Arc::clone(&node_broker_clone);
ws.on_upgrade(move |socket| async move {
node_broker_clone.subscribe("node".to_string(), socket).await;
})
},
);1. warp::path!("watch" / "node")
*這部分代碼定義了一個路徑過濾器,用于匹配路徑 /watch/node 的 HTTP 請求。warp::path!是 Warp 框架提供的一個宏,用于簡化路徑定義。這里的"watch" / "node"表示請求路徑必須是/watch/node` 才能匹配這個過濾器。
2. .and(warp::ws())
這一部分代碼將路徑過濾器與 WebSocket 協(xié)議過濾器組合起來。warp::ws() 過濾器會匹配 WebSocket 握手請求并提取一個 warp::ws::Ws 類型,表示 WebSocket 配置。這表示我們的這個路徑將為一個websocket接口。
warp::ws()過濾器用于匹配并提取 WebSocket 握手請求,確保該請求是 WebSocket 協(xié)議請求。
3. .map(move |ws: warp::ws::Ws| { ... })
.map 方法用于將前面的過濾器組合結果映射到一個新的處理邏輯中。這里的 move |ws: warp::ws::Ws| { ... } 是一個閉包,用于處理 WebSocket 請求。
move關鍵字確保閉包捕獲其環(huán)境中的所有變量的所有權,因為這些變量將在異步操作中使用。ws: warp::ws::Ws參數是從前面的warp::ws()過濾器中提取的 WebSocket 配置。
4. ws.on_upgrade(move |socket| async move { ... })
ws.on_upgrade 方法用于將 WebSocket 協(xié)議升級請求處理為 WebSocket 連接。它接受一個閉包作為參數,當 WebSocket 握手成功后,這個閉包會被調用。在官方定義中,這個方法主要用于自定義一個函數對建立后的websocket連接進行一定的操作,因此我們在這里將建立連接后一切操作,比如保持連接,發(fā)送信息等。
/// Finish the upgrade, passing a function to handle the `WebSocket`.
///
/// The passed function must return a `Future`.
pub fn on_upgrade<F, U>(self, func: F) -> impl Reply
where
F: FnOnce(WebSocket) -> U + Send + 'static,
U: Future<Output = ()> + Send + 'static,
{
WsReply {
ws: self,
on_upgrade: func,
}
}move |socket| async move { ... }是一個異步閉包,它將在 WebSocket 連接成功升級后執(zhí)行。socket參數表示已經升級的 WebSocket 連接。
5. node_broker_clone.subscribe("node".to_string(), socket).await;
在異步閉包內部,調用 node_broker_clone 的subscribe` 方法,將新的 WebSocket 連接訂閱到節(jié)點(node)主題中。后續(xù)我們將展開講解
"node".to_string()將節(jié)點主題名稱轉換為字符串。socket參數表示當前的 WebSocket 連接。await關鍵字等待異步訂閱操作完成。
websocket連接處理
上面說到,我們通過 ws.on_upgrade(move |socket| async move { ... })這個方法在連接建立之后進行處理,其中可以知道,我們處理的方法如下所示。
pub async fn subscribe(&self, topic: Topic, socket: warp::ws::WebSocket) {
let (ws_sender, mut ws_receiver) = socket.split();
let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
let subscriber_id = Uuid::new_v4();
?
{
let mut subs = self.subscribers.write().await;
subs.entry(topic.clone()).or_default().insert(subscriber_id, tx);
}
?
let subscribers = Arc::clone(&self.subscribers);
tokio::task::spawn(async move {
while let Some(result) = ws_receiver.next().await {
match result {
Ok(message) => {
// 處理有效的消息
if message.is_text() {
println!(
"Received message from client: {}",
message.to_str().unwrap()
);
}
}
Err(e) => {
// 處理錯誤
eprintln!("WebSocket error: {:?}", e);
break;
}
}
}
println!("WebSocket connection closed");
subscribers.write().await.get_mut(&topic).map(|subscribers| subscribers.remove(&subscriber_id));
});
?
tokio::task::spawn(async move {
let mut sender = ws_sender;
?
while let Some(msg) = rx.recv().await {
let _ = sender.send(msg).await;
}
});
}websocket連接處理 let (ws_sender, mut ws_receiver) = socket.split();這里使用原生的代碼,將已經建立起來的socket進行分割,因為websocket是雙向連接,因此獲得針對這個socket的發(fā)送端(ws_sender)和接收端(ws_receiver)。
建立連接并保存
let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
let subscriber_id = Uuid::new_v4();
?
{
let mut subs = self.subscribers.write().await;
subs.entry(topic.clone()).or_default().insert(subscriber_id, tx);
}在這里,我們建立了個一個管道,并將subscriber的信息進行保存,這里的 mpsc::unbounded_channel::<Message>();類似于golang中的channel,他會生成一個發(fā)送者、一個接收者,當往發(fā)送者發(fā)送消息的時候,接收者會受到該消息并進行一定處理。因此我們將subscriber的發(fā)送者(tx)保存至內存里。
建立消息發(fā)送機制
tokio::task::spawn(async move {
let mut sender = ws_sender;
?
while let Some(msg) = rx.recv().await {
let _ = sender.send(msg).await;
}
});這個就是很簡單了,通過如果rx收到了消息,則向websocket的subscriber進行發(fā)送。該任務是以新協(xié)程任務的方式啟動的,在后臺持續(xù)運行
建立websocket連接?;顧C制
let subscribers = Arc::clone(&self.subscribers);
tokio::task::spawn(async move {
while let Some(result) = ws_receiver.next().await {
match result {
Ok(message) => {
// 處理有效的消息
if message.is_text() {
println!(
"Received message from client: {}",
message.to_str().unwrap()
);
}
}
Err(e) => {
// 處理錯誤
eprintln!("WebSocket error: {:?}", e);
break;
}
}
}
println!("WebSocket connection closed");
subscribers.write().await.get_mut(&topic).map(|subscribers| subscribers.remove(&subscriber_id));
});這里我們仍然在后臺啟動一個守護協(xié)程,用于保活websocket連接,一旦發(fā)生了連接失效,則注銷消息發(fā)送機制,刪除subscribers緩存中的訂閱者。
消息推送機制
事件推送事件推送時候將允許調用相關事件的推送地址,向推送端發(fā)送消息。
pub async fn produce_node_event(&self, event: WatchEvent<Node>) {
self.node_broker.produce("node".to_string(), event).await;
}
pub async fn produce_task_event(&self, event: WatchEvent<Task>) {
self.task_broker.produce("task".to_string(), event).await;
}
pub async fn produce_job_event(&self, event: WatchEvent<Job>) {
self.job_broker.produce("job".to_string(), event).await;
}當收到消息的時候,不直接處理消息,而是將放入緩存隊列中(一個消息無界流)
pub async fn produce(&self, topic: Topic, event: WatchEvent<R>) {
if let Err(e) = self.event_sender.send((topic.clone(), event.clone())) {
eprintln!("Failed to send event: {}", e);
}
}事件分發(fā)同樣的。將啟動一個協(xié)程,用于從和event_sender對應的event_receiver中獲取消息,推送給訂閱者。
fn start_event_dispatcher(broker: Arc<Self>, mut event_receiver: UnboundedReceiver<(Topic, WatchEvent<R>)>) {
tokio::spawn(async move {
while let Some((topic, event)) = event_receiver.recv().await {
let event_json = serde_json::to_string(&event).unwrap();
let subscribers_list;
{
let subscribers = broker.subscribers.read().await;
subscribers_list = subscribers.get(&topic).cloned().unwrap_or_default();
}
let mut invalid_subscribers = vec![];
for (id, ws_sender) in subscribers_list {
if ws_sender.send(warp::ws::Message::text(event_json.clone())).is_err() {
invalid_subscribers.push(id);
}
}
if !invalid_subscribers.is_empty() {
let mut subscribers = broker.subscribers.write().await;
if let Some(subscribers) = subscribers.get_mut(&topic) {
for id in invalid_subscribers {
subscribers.remove(&id);
}
}
}
}
});
}獲取訂閱者的列表并依次發(fā)送
如果發(fā)現(xiàn)發(fā)送失敗,則將這個訂閱者從緩存中刪除
客戶端
客戶端的代碼就是建立起來一個訂閱者關注相關事件的動態(tài)。在相應的代碼中,可以使用該方法。本方法最終返回的是一個無界流 Stream<Item = WatchEvent<R>>,用于得到服務器推送過來的事件類型
pub async fn list_and_watch<R>(api_client: &ApiClient, resource_name: &str) -> impl Stream<Item = WatchEvent<R>>
where
R: Resource + Clone + DeserializeOwned + 'static + Send,
{
// 先通過 HTTP 獲取資源列表
let initial_resources = get_resource_list::<R>(api_client).await;
// 解析要連接WebSocket服務器的URL
let url = Url::parse(&*format!("{}/{}", api_client.watch_url, resource_name)).expect("Invalid URL");
// 連接到WebSocket服務器
println!("watch url is {}", url);
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
println!("Watch client connected");
let (mut write, read) = ws_stream.split();
let (tx, rx) = mpsc::unbounded_channel();
// 先發(fā)送初始資源列表
match initial_resources {
Ok(res) => tx.send(WatchEvent::Restarted(res)).unwrap(),
Err(e) => eprintln!("list resource failed, {}", e),
};
// 將 WebSocket 讀流轉換為消息事件流
tokio::spawn(async move {
read.for_each(|message| async {
match message {
Ok(msg) => {
if msg.is_text() {
let text = msg.to_text().unwrap();
match serde_json::from_str::<WatchEvent<R>>(text) {
Ok(event) => {
tx.send(event).unwrap();
}
Err(e) => {
eprintln!("Failed to parse message: {:?}", e);
}
}
}
}
Err(e) => {
eprintln!("Error receiving message: {:?}", e);
}
}
}).await;
});
// 保持 WebSocket 連接活躍
tokio::spawn(async move {
loop {
if let Err(e) = write.send(WatchMessage::Text(String::new())).await {
eprintln!("Error sending ping: {:?}", e);
break;
}
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
}
});
tokio_stream::wrappers::UnboundedReceiverStream::new(rx)
}使用驗證
不足分析
經過上面的介紹,我們可以看到這個基礎的list and watch機制能夠正確運行。但是,和K8S、ETCD中廣泛使用的list and watch相比仍然缺少一個機制來保證list和watch的一致性。
請考慮這樣一種情況我們的服務器中會源源不斷地產生數據d1,d2,d3,...,dn。當我們使用list時候,能夠感知到d1,d2,d3,此時我們完成list,開始建立watch。加入在開始建立watch這個階段,即使可能是幾毫秒的時間但服務器生成了d4,而在watch建立起來后,只能接收到d5,d6,...。這就導致了數據的遺失。
在 Kubernetes 中,List 和 Watch 操作結合使用時,需要使用一個revision機制以確保資源的變更不會被遺漏。理解 List 和 Watch 操作時 revision(即 resourceVersion)的具體含義和管理方式對于保證一致性至關重要。revision的存在有著如下的意義:
- 數據版本控制:
revision是 Etcd 的全局遞增計數器,用于標識數據的當前版本。當進行數據的修改、更新操作時候,revision會+1 - 一致性視圖:確保返回的數據是一致的快照視圖,表示在該
revision之前的所有操作都已完成。
revision 與 List 和 Watch 的關系
- List 操作:
- 返回資源列表和當前的全局
revision,作為resourceVersion。 - 確保獲取到的資源是該
revision時刻的一致視圖。
- 返回資源列表和當前的全局
- Watch 操作:
- 使用 List
操作返回的resourceVersion` 作為起點。 - 從該 resourceVersion
開始監(jiān)聽資源的變化,確保在List和Watch` 之間的變更不會丟失。
- 使用 List
List 操作的 revision
當進行 List 操作時,Kubernetes API Server 從 Etcd 獲取當前資源的狀態(tài)及其resourceVersion 。這個 resourceVersion 是 Etcd 當前的全局revision 。它表示在此 revision 之前的所有操作都已經完成,并確保返回的數據是這個revision` 時刻的一致視圖。
Watch 操作的 revision
Watch 操作使用 List 操作返回的 resourceVersion 作為起點,從該版本開始監(jiān)聽資源的變化。這確保了從 List 到 Watch 之間的變更不會被遺漏。
示例流程
- List 操作:
- API Server 從 Etcd 獲取指定資源的當前狀態(tài)。
- Etcd 返回包含所有資源對象的列表和一個全局 revision
,這個revision將作為resourceVersion`。
- Watch 操作:
- API Server 使用
List操作返回的resourceVersion(revision) 作為起點,開始監(jiān)聽資源的變化。 - Etcd 返回從指定 revision` 開始的所有變更事件。
- API Server 使用
總結
revision:標識數據版本,確保數據一致性。List和Watch:List獲取資源和revision,Watch從該revision開始監(jiān)聽變化,確保變更的連續(xù)性和一致性。
到此這篇關于使用 Rust 實現(xiàn)的基礎的List 和 Watch 機制的文章就介紹到這了,更多相關Rust List 和 Watch 機制內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
關于使用rust調用c++靜態(tài)庫并編譯nodejs包的問題
這篇文章主要介紹了使用rust調用c++靜態(tài)庫并編譯nodejs包的問題,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-08-08

