ZooKeeper 實現(xiàn)分布式鎖的方法示例
ZooKeeper 是一個典型的分布式數(shù)據(jù)一致性解決方案,分布式應用程序可以基于 ZooKeeper 實現(xiàn)諸如數(shù)據(jù)發(fā)布/訂閱、負載均衡、分布式協(xié)調/通知、集群管理、Master 選舉、分布式鎖等功能。
節(jié)點
在介紹 ZooKeeper 分布式鎖前需要先了解一下 ZooKeeper 中節(jié)點(Znode),ZooKeeper 的數(shù)據(jù)存儲數(shù)據(jù)模型是一棵樹(Znode Tree),由斜杠(/)的進行分割的路徑,就是一個 Znode(如 /locks/my_lock)。每個 Znode 上都會保存自己的數(shù)據(jù)內容,同時還會保存一系列屬性信息。
Znode 又分為以下四種類型:
類型 | 描述 |
---|---|
持久節(jié)點 | 節(jié)點創(chuàng)建后,會一直存在,不會因客戶端會話失效而刪除 |
持久順序節(jié)點 | 基本特性與持久節(jié)點一致,創(chuàng)建節(jié)點的過程中,ZooKeeper 會在其名字后自動追加一個單調增長的數(shù)字后綴,作為新的節(jié)點名 |
臨時節(jié)點 | 客戶端會話失效或連接關閉后,該節(jié)點會被自動刪除 |
臨時順序節(jié)點 | 基本特性與臨時節(jié)點一致,創(chuàng)建節(jié)點的過程中,ZooKeeper 會在其名字后自動追加一個單調增長的數(shù)字后綴,作為新的節(jié)點名 |
鎖原理
ZooKeeper 分布式鎖是基于 臨時順序節(jié)點 來實現(xiàn)的,鎖可理解為 ZooKeeper 上的一個節(jié)點,當需要獲取鎖時,就在這個鎖節(jié)點下創(chuàng)建一個臨時順序節(jié)點。當存在多個客戶端同時來獲取鎖,就按順序依次創(chuàng)建多個臨時順序節(jié)點,但只有排列序號是第一的那個節(jié)點能獲取鎖成功,其他節(jié)點則按順序分別監(jiān)聽前一個節(jié)點的變化,當被監(jiān)聽者釋放鎖時,監(jiān)聽者就可以馬上獲得鎖。
而且用臨時順序節(jié)點的另外一個用意是如果某個客戶端創(chuàng)建臨時順序節(jié)點后,自己意外宕機了也沒關系,ZooKeeper 感知到某個客戶端宕機后會自動刪除對應的臨時順序節(jié)點,相當于自動釋放鎖。
如上圖:ClientA 和 ClientB 同時想獲取鎖,所以都在 locks 節(jié)點下創(chuàng)建了一個臨時節(jié)點 1 和 2,而 1 是當前 locks 節(jié)點下排列序號第一的節(jié)點,所以 ClientA 獲取鎖成功,而 ClientB 處于等待狀態(tài),這時 ZooKeeper 中的 2 節(jié)點會監(jiān)聽 1 節(jié)點,當 1節(jié)點鎖釋放(節(jié)點被刪除)時,2 就變成了 locks 節(jié)點下排列序號第一的節(jié)點,這樣 ClientB 就獲取鎖成功了。
代碼測試
請確保 ZooKeeper 服務已啟動,ZooKeeper 的搭建可參考Kafka 集群 中的 ZooKeeper 集群部分
以下是基于 C# 的測試,Java 可使用 Curator 框架,實現(xiàn)原理和上面描述是一致的,有興趣可以看看源碼,應該也不難理解。
創(chuàng)建 .NET Core 控制臺程序 Nuget
創(chuàng)建 ZooKeeper Client
private const int CONNECTION_TIMEOUT = 50000; private const string CONNECTION_STRING = "127.0.0.1:2181"; private ZooKeeper CreateClient() { var zooKeeper = new ZooKeeper(CONNECTION_STRING, CONNECTION_TIMEOUT, NullWatcher.Instance); Stopwatch sw = new Stopwatch(); sw.Start(); while (sw.ElapsedMilliseconds < CONNECTION_TIMEOUT) { var state = zooKeeper.getState(); if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTING) { break; } } sw.Stop(); return zooKeeper; } class NullWatcher : Watcher { public static readonly NullWatcher Instance = new NullWatcher(); private NullWatcher() { } public override Task process(WatchedEvent @event) { return Task.CompletedTask; } }
添加 Lock 方法
/// <summary> /// 加鎖 /// </summary> /// <param name="key">加鎖的節(jié)點名</param> /// <param name="lockAcquiredAction">加鎖成功后需要執(zhí)行的邏輯</param> /// <param name="lockReleasedAction">鎖釋放后需要執(zhí)行的邏輯,可為空</param> /// <returns></returns> public async Task Lock(string key, Action lockAcquiredAction, Action lockReleasedAction = null) { // 獲取 ZooKeeper Client ZooKeeper keeper = CreateClient(); // 指定鎖節(jié)點 WriteLock writeLock = new WriteLock(keeper, $"/{key}", null); var lockCallback = new LockCallback(() => { lockAcquiredAction.Invoke(); writeLock.unlock(); }, lockReleasedAction); // 綁定鎖獲取和釋放的監(jiān)聽對象 writeLock.setLockListener(lockCallback); // 獲取鎖(獲取失敗時會監(jiān)聽上一個臨時節(jié)點) await writeLock.Lock(); } class LockCallback : LockListener { private readonly Action _lockAcquiredAction; private readonly Action _lockReleasedAction; public LockCallback(Action lockAcquiredAction, Action lockReleasedAction) { _lockAcquiredAction = lockAcquiredAction; _lockReleasedAction = lockReleasedAction; } /// <summary> /// 獲取鎖成功回調 /// </summary> /// <returns></returns> public Task lockAcquired() { _lockAcquiredAction?.Invoke(); return Task.FromResult(0); } /// <summary> /// 釋放鎖成功回調 /// </summary> /// <returns></returns> public Task lockReleased() { _lockReleasedAction?.Invoke(); return Task.FromResult(0); } }
多線程模擬測試
static async Task RunAsync() { Parallel.For(1, 10, async (i) => { await new ZooKeeprDistributedLock().Lock("locks", () => { Console.WriteLine($"第{i}個請求,獲取鎖成功:{DateTime.Now},線程Id:{Thread.CurrentThread.ManagedThreadId}"); Thread.Sleep(1000); // 業(yè)務邏輯... }, () => { Console.WriteLine($"第{i}個請求,釋放鎖成功:{DateTime.Now},線程Id:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine("-------------------------------"); }); }); await Task.CompletedTask; }
雖然模擬的是多線程并行執(zhí)行,但最終都會依賴鎖的獲取和釋放而串行執(zhí)行實際業(yè)務邏輯。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
c# 重載WndProc,實現(xiàn)重寫“最小化”的實現(xiàn)方法
在做“亦歌桌面版”的時候,發(fā)現(xiàn)當打開歌詞狀態(tài)下,用最小化隱藏窗體到托盤的話(如下code #1),在調出發(fā)現(xiàn)歌詞縮小了(雖然顯現(xiàn)的窗體大小跟剛才一樣),從這點看調用該方法其實窗體大小是改變了的(這個過程只是不可視而已)。2009-02-02C# Xamarin利用ZXing.Net.Mobile進行掃碼的方法
這篇文章主要介紹了C# Xamarin利用ZXing.Net.Mobile進行掃碼的方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-06-06C#操作Windows服務類System.ServiceProcess.ServiceBase
這篇文章介紹了C#操作Windows服務類System.ServiceProcess.ServiceBase,文中通過示例代碼介紹的非常詳細。對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-05-05