C# 線程安全詳解
介紹
在 .NET4.0 之前,如果我們需要在多線程環(huán)境下使用 Dictionary 類,除了自己實現(xiàn)線程同步來保證線程安全外,我們沒有其他選擇。很多開發(fā)人員肯定都實現(xiàn)過類似的線程安全方案,可能是通過創(chuàng)建全新的線程安全字典,或者僅是簡單的用一個類封裝一個 Dictionary 對象,并在所有方法中加上鎖機制,我們稱這種方案叫 “Dictionary+Locks” 。
System.Collections.Concurrent 命名空間下提供多個線程安全集合類,只要多個線程同時訪問集合,就應使用這些類來代替 System.Collections 和 System.Collections.Generic 命名空間中的相應類型。 但是,不保證通過擴展方法或通過顯式接口實現(xiàn)訪問集合對象是線程安全的,可能需要由調用方進行同步。
經典生產消費問題
介紹
這個問題是最為經典的多線程應用問題問題就是:有一個或多個線程(生產者線程)產生一些數(shù)據,還有一個或者多個線程(消費者線程)要取出這些數(shù)據并執(zhí)行一些相應的工作。
Queue
接下來,我們是使用程序去描述這個問題,看下面代碼
static void Main(string[] args) { int count = 0; // 臨界資源區(qū) var queue = new Queue<string>(); // 生產者線程 Task.Factory.StartNew(() => { while (true) { queue.Enqueue("mesg" + count); count++; } }); // 消費者線程1 Task.Factory.StartNew(() => { while (true) { if (queue.Count > 0) { string value = queue.Dequeue(); Console.WriteLine("Worker A: " + value); } } }); // 消費者線程2 Task.Factory.StartNew(() => { while (true) { if (queue.Count > 0) { string value = queue.Dequeue(); Console.WriteLine("Worker B: " + value); } } }); Thread.Sleep(50000); }
我們使用 Queue 模擬了一個簡單的資源池,一個生產者放數(shù)據,兩個消費者消費數(shù)據。
這個程序運行以后會產生異常,異常的原因很簡單。當某時刻,第一個消費者判斷 queue.Count > 0 為true 時,就會到 Queue 中取數(shù)據。但是,此時這個數(shù)據可能會被第二個消費者拿走了,因為第二個消費者也判斷出此時有數(shù)據可取。第一個消費者取取數(shù)據時就會發(fā)生異常,這就是一個簡單的臨界資源線程安全問題。
知道問題了,那么如何解決呢?有兩種方案,接下來進行講解
ConcurrentQueue
1 . 加鎖
這個方案是可行的,很多時候我們也是這么做的,包括微軟早期實現(xiàn)線程安全的 ArrayList 和 Hashtable 內部 (Synchronized方法) 也是這么實現(xiàn)的。這個方案適用于只有少量的消費者,并且每個消費者都會執(zhí)行大量操作的時候,這時 lock 并沒什么太大問題,但是,如果是大批量短小精悍的消費者存在的話,lock 會嚴重影響代碼的執(zhí)行效率。
2 . 線程安全的集合區(qū)
這個就是 .NET4.0 后 System.Collections.Concurrent 命名空間下提供多個線程安全集合類方案。
新的線程安全的這些集合內部不再使用lock機制這種比較低效的方式去實現(xiàn)線程安全,而是轉而使用SpinWait 和 Interlocked 等機制,間接實現(xiàn)了線程安全,這種方式的效率要高于使用lock的方式。
var queue = new ConcurrentQueue<string>(); Task.Factory.StartNew(() => { while (true) { queue.Enqueue("msg" + count); count++; } }); Task.Factory.StartNew(() => { while (true) { string value; if (queue.TryDequeue(out value)) { Console.WriteLine("Worker A: " + value); } } }); Task.Factory.StartNew(() => { while (true) { string value; if (queue.TryDequeue(out value)) { Console.WriteLine("Worker B: " + value); } } });
ConcurrentQueue.TryDequeue(T) 方法會嘗試獲取消費,那能不能不要去判斷集合是否為空,集合當自己沒有元素的時候自己 Block 一下可以嗎?答案是,可以的
BlockingCollection
針對上面的問題,我們可以使用 BlockingCollection 即可。接下來我來看
var blockingCollection = new BlockingCollection<string>(); Task.Factory.StartNew(() => { while (true) { blockingCollection.Add("msg" + count); count++; } }); Task.Factory.StartNew(() => { while (true) { Console.WriteLine("Worker A: " + blockingCollection.Take()); } }); Task.Factory.StartNew(() => { while (true) { Console.WriteLine("Worker B: " + blockingCollection.Take()); } });
BlockingCollection 集合是一個擁有阻塞功能的集合,它就是完成了經典生產者消費者的算法功能。它沒有實現(xiàn)底層的存儲結構,而是使用了實現(xiàn) IProducerConsumerCollection 接口的幾個集合作為底層的數(shù)據結構,例如 ConcurrentBag, ConcurrentStack 或者是 ConcurrentQueue。你可以在構造BlockingCollection 實例的時候傳入這個參數(shù),如果不指定的話,則默認使用 ConcurrentQueue 作為存儲結構。
而對于生產者來說,只需要通過調用其Add方法放數(shù)據,消費者只需要調用Take方法來取數(shù)據就可以了。
當然了上面的消費者代碼中還有一點是讓人不爽的,那就是 while 語句,可以更優(yōu)雅一點嗎?答案是,可以的。
Task.Factory.StartNew(() => { foreach (string value in blockingCollection.GetConsumingEnumerable()) { Console.WriteLine("Worker A: " + value); } });
BlockingCollection.GetConsumingEnumerable 方法是關鍵,這個方法會遍歷集合取出數(shù)據,一旦發(fā)現(xiàn)集合空了,則阻塞自己,直到集合中又有元素了再開始遍歷。
此時,完美了解決了生產者消費者問題。然而通常來說,還有下面兩個問題我們有時需要去控制
1 . 控制集合中數(shù)據的最大數(shù)量
這個問題由 BlockingCollection 構造函數(shù)解決,構造該對象實例的時候,構造函數(shù)中的 BoundedCapacity 決定了集合最大的可容納數(shù)據數(shù)量,這個比較簡單。
2 . 何時停止的問題
這個問題由 CompleteAdding 和 IsCompleted 兩個配合解決。CompleteAdding 方法是直接不允許任何元素被加入集合;當使用了 CompleteAdding 方法后且集合內沒有元素的時候,另一個屬性 IsCompleted 此時會為 True,這個屬性可以用來判斷是否當前集合內的所有元素都被處理完。生產者修改后的代碼:
Task.Factory.StartNew(() => { for (int count = 0; count < 10; count++) { blockingCollection.Add("msg" + count); } blockingCollection.CompleteAdding(); });
當使用了 CompleteAdding 方法后,對象停止往集合中添加數(shù)據,這時如果是使用 GetConsumingEnumerable 枚舉的,那么這種枚舉會自然結束,不會再 Block 住集合,這種方式最優(yōu)雅,也是推薦的寫法。
但是如果是使用 TryTake 訪問元素的,則需要使用 IsCompleted 判斷一下,因為這個時候使用 TryTake 會拋InvalidOperationException 異常。接著我們看下最后的完整代碼:
static void Main(string[] args) { var blockingCollection = new BlockingCollection<string>(); var producer = Task.Factory.StartNew(() => { for (int count = 0; count < 10; count++) { blockingCollection.Add("msg" + count); Thread.Sleep(300); } blockingCollection.CompleteAdding(); }); var consumer1 = Task.Factory.StartNew(() => { foreach (string value in blockingCollection.GetConsumingEnumerable()) { Console.WriteLine("Worker A: " + value); } }); var consumer2 = Task.Factory.StartNew(() => { foreach (string value in blockingCollection.GetConsumingEnumerable()) { Console.WriteLine("Worker B: " + value); } }); Task.WaitAll(producer, consumer1, consumer2); }
BlockingCollection 枚舉
此外,需要注意 BlockingCollection 有兩種枚舉方法,
1 . foreach
首先 BlockingCollection 本身繼承自IEnumerable,所以它自己就可以被 foreach 枚舉,首先 BlockingCollection 包裝了一個線程安全集合,那么它自己也是線程安全的,而當多個線程在同時修改或訪問線程安全容器時,BlockingCollection 自己作為 IEnumerable 會返回一個一定時間內的集合片段,也就是只會枚舉在那個時間點上內部集合的元素。使用這種方式枚舉的時候,不會有 Block 效果。
2 . GetConsumingEnumerable
另外一種方式就是我們上面使用的 GetConsumingEnumerable 方式的枚舉,這種方式會有 Block 效果,直到 CompleteAdding 被調用為止。
BlockingCollection 擴展
實現(xiàn) IProducerConsumerCollection 接口的幾個集合:ConcurrentBag (線程安全的無序的元素集合), ConcurrentStack (線程安全的堆棧) 和 ConcurrentQueue (線程安全的隊列)。這些都很簡單,功能與非線程安全的那些集合都一樣,只不過是多了 TryXXX 方法,多線程環(huán)境下使用這些方法就好了。
System.Collections.Concurrent
System.Collections.Concurrent 下面還有一些其他與多線程相關的集合,有些個類在原來的基礎上也添加了一下新的方法,例如:AddOrUpdate,GetOrAdd,TryXXX 等等,都很容易理解。
總結
本篇文章就到這里了,希望能夠給你帶來幫助,也希望您能夠多多關注腳本之家的更多內容!
相關文章
C#基礎:Dispose()、Close()、Finalize()的區(qū)別詳解
本篇文章是對c#中的Dispose()、Close()、Finalize()的區(qū)別進行了詳細的分析介紹,需要的朋友參考下2013-05-05WindowsForm實現(xiàn)TextBox占位符Placeholder提示功能
這篇文章主要介紹了WindowsForm實現(xiàn)TextBox占位符Placeholder提示,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07