C#多線程編程中的鎖系統(tǒng)(四):自旋鎖
目錄
一:基礎(chǔ)
二:自旋鎖示例
三:SpinLock
四:繼續(xù)SpinLock
五:總結(jié)
一:基礎(chǔ)
內(nèi)核鎖:基于內(nèi)核對象構(gòu)造的鎖機制,就是通常說的內(nèi)核構(gòu)造模式。用戶模式構(gòu)造和內(nèi)核模式構(gòu)造
優(yōu)點:cpu利用最大化。它發(fā)現(xiàn)資源被鎖住,請求就排隊等候。線程切換到別處干活,直到接受到可用信號,線程再切回來繼續(xù)處理請求。
缺點:托管代碼->用戶模式代碼->內(nèi)核代碼損耗、線程上下文切換損耗。
在鎖的時間比較短時,系統(tǒng)頻繁忙于休眠、切換,是個很大的性能損耗。
自旋鎖:原子操作+自循環(huán)。通常說的用戶構(gòu)造模式。 線程不休眠,一直循環(huán)嘗試對資源訪問,直到可用。
優(yōu)點:完美解決內(nèi)核鎖的缺點。
缺點:長時間一直循環(huán)會導(dǎo)致cpu的白白浪費,高并發(fā)競爭下、CPU的消耗特別嚴(yán)重。
混合鎖:內(nèi)核鎖+自旋鎖。 混合鎖是先自旋鎖一段時間或自旋多少次,再轉(zhuǎn)成內(nèi)核鎖。
優(yōu)點:內(nèi)核鎖和自旋鎖的折中方案,利用前二者優(yōu)點,避免出現(xiàn)極端情況(自旋時間過長,內(nèi)核鎖時間過短)。
缺點: 自旋多少時間、自旋多少次,這些策略很難把控。
ps:操作系統(tǒng)或net框架,這塊算法策略做的已經(jīng)非常優(yōu)了,有些API函數(shù)也提供了時間及次數(shù)可配置項,讓開發(fā)者根據(jù)需求自行判斷。
二:自旋鎖示例
來看下我們自己簡單實現(xiàn)的自旋鎖:
int signal = 0;
var li = new List<int>();
Parallel.For(0, 1000 * 10000, r =>
{
while (Interlocked.Exchange(ref signal, 1) != 0)//加自旋鎖
{
//黑魔法
}
li.Add(r);
Interlocked.Exchange(ref signal, 0); //釋放鎖
});
Console.WriteLine(li.Count);
//輸出:10000000
上面就是自旋鎖:Interlocked.Exchange+while
1:定義signal 0可用,1不可用。
2:Parallel模擬并發(fā)競爭,原子更改signal狀態(tài)。 后續(xù)線程自旋訪問signal,是否可用。
3:A線程使用完后,更改signal為0。 剩余線程競爭訪問資源,B線程勝利后,更改signal為1,失敗線程繼續(xù)自旋,直到可用。
三:SpinLock
SpinLock是net4.0后系統(tǒng)幫我們實現(xiàn)的自旋鎖,內(nèi)部做了優(yōu)化。
簡單看下實例:
var li = new List<int>();
var sl = new SpinLock();
Parallel.For(0, 1000 * 10000, r =>
{
bool gotLock = false; //釋放成功
sl.Enter(ref gotLock); //進入鎖
li.Add(r);
if (gotLock) sl.Exit(); //釋放
});
Console.WriteLine(li.Count);
//輸出:10000000
四:繼續(xù)SpinLock
new SpinLock(false) 這個構(gòu)造函數(shù)主要用來幫我們檢查死鎖用,true是開啟。
開啟狀態(tài)下,如果發(fā)生死鎖會直接拋異常的。
貼了一部分源碼(已折疊),我們來看下:
public void Enter(ref bool lockTaken)
{
if (lockTaken)
{
lockTaken = false;
throw new System.ArgumentException(Environment.GetResourceString("SpinLock_TryReliableEnter_ArgumentException"));
}
// Fast path to acquire the lock if the lock is released
// If the thread tracking enabled set the new owner to the current thread id
// Id not, set the anonymous bit lock
int observedOwner = m_owner;
int newOwner = 0;
bool threadTrackingEnabled = (m_owner & LOCK_ID_DISABLE_MASK) == 0;
if (threadTrackingEnabled)
{
if (observedOwner == LOCK_UNOWNED)
newOwner = Thread.CurrentThread.ManagedThreadId;
}
else if ((observedOwner & LOCK_ANONYMOUS_OWNED) == LOCK_UNOWNED)
{
newOwner = observedOwner | LOCK_ANONYMOUS_OWNED; // set the lock bit
}
if (newOwner != 0)
{
#if !FEATURE_CORECLR
Thread.BeginCriticalRegion();
#endif
#if PFX_LEGACY_3_5
if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner) == observedOwner)
{
lockTaken = true;
return;
}
#else
if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner, ref lockTaken) == observedOwner)
{
// Fast path succeeded
return;
}
#endif
#if !FEATURE_CORECLR
Thread.EndCriticalRegion();
#endif
}
//Fast path failed, try slow path
ContinueTryEnter(Timeout.Infinite, ref lockTaken);
}
private void ContinueTryEnter(int millisecondsTimeout, ref bool lockTaken)
{
long startTicks = 0;
if (millisecondsTimeout != Timeout.Infinite && millisecondsTimeout != 0)
{
startTicks = DateTime.UtcNow.Ticks;
}
#if !FEATURE_PAL && !FEATURE_CORECLR // PAL doesn't support eventing, and we don't compile CDS providers for Coreclr
if (CdsSyncEtwBCLProvider.Log.IsEnabled())
{
CdsSyncEtwBCLProvider.Log.SpinLock_FastPathFailed(m_owner);
}
#endif
if (IsThreadOwnerTrackingEnabled)
{
// Slow path for enabled thread tracking mode
ContinueTryEnterWithThreadTracking(millisecondsTimeout, startTicks, ref lockTaken);
return;
}
// then thread tracking is disabled
// In this case there are three ways to acquire the lock
// 1- the first way the thread either tries to get the lock if it's free or updates the waiters, if the turn >= the processors count then go to 3 else go to 2
// 2- In this step the waiter threads spins and tries to acquire the lock, the number of spin iterations and spin count is dependent on the thread turn
// the late the thread arrives the more it spins and less frequent it check the lock avilability
// Also the spins count is increaes each iteration
// If the spins iterations finished and failed to acquire the lock, go to step 3
// 3- This is the yielding step, there are two ways of yielding Thread.Yield and Sleep(1)
// If the timeout is expired in after step 1, we need to decrement the waiters count before returning
int observedOwner;
//***Step 1, take the lock or update the waiters
// try to acquire the lock directly if possoble or update the waiters count
SpinWait spinner = new SpinWait();
while (true)
{
observedOwner = m_owner;
if ((observedOwner & LOCK_ANONYMOUS_OWNED) == LOCK_UNOWNED)
{
#if !FEATURE_CORECLR
Thread.BeginCriticalRegion();
#endif
#if PFX_LEGACY_3_5
if (Interlocked.CompareExchange(ref m_owner, observedOwner | 1, observedOwner) == observedOwner)
{
lockTaken = true;
return;
}
#else
if (Interlocked.CompareExchange(ref m_owner, observedOwner | 1, observedOwner, ref lockTaken) == observedOwner)
{
return;
}
#endif
#if !FEATURE_CORECLR
Thread.EndCriticalRegion();
#endif
}
else //failed to acquire the lock,then try to update the waiters. If the waiters count reached the maximum, jsut break the loop to avoid overflow
if ((observedOwner & WAITERS_MASK) == MAXIMUM_WAITERS || Interlocked.CompareExchange(ref m_owner, observedOwner + 2, observedOwner) == observedOwner)
break;
spinner.SpinOnce();
}
// Check the timeout.
if (millisecondsTimeout == 0 ||
(millisecondsTimeout != Timeout.Infinite &&
TimeoutExpired(startTicks, millisecondsTimeout)))
{
DecrementWaiters();
return;
}
//***Step 2. Spinning
//lock acquired failed and waiters updated
int turn = ((observedOwner + 2) & WAITERS_MASK) / 2;
int processorCount = PlatformHelper.ProcessorCount;
if (turn < processorCount)
{
int processFactor = 1;
for (int i = 1; i <= turn * SPINNING_FACTOR; i++)
{
Thread.SpinWait((turn + i) * SPINNING_FACTOR * processFactor);
if (processFactor < processorCount)
processFactor++;
observedOwner = m_owner;
if ((observedOwner & LOCK_ANONYMOUS_OWNED) == LOCK_UNOWNED)
{
#if !FEATURE_CORECLR
Thread.BeginCriticalRegion();
#endif
int newOwner = (observedOwner & WAITERS_MASK) == 0 ? // Gets the number of waiters, if zero
observedOwner | 1 // don't decrement it. just set the lock bit, it is zzero because a previous call of Exit(false) ehich corrupted the waiters
: (observedOwner - 2) | 1; // otherwise decrement the waiters and set the lock bit
Contract.Assert((newOwner & WAITERS_MASK) >= 0);
#if PFX_LEGACY_3_5
if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner) == observedOwner)
{
lockTaken = true;
return;
}
#else
if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner, ref lockTaken) == observedOwner)
{
return;
}
#endif
#if !FEATURE_CORECLR
Thread.EndCriticalRegion();
#endif
}
}
}
// Check the timeout.
if (millisecondsTimeout != Timeout.Infinite && TimeoutExpired(startTicks, millisecondsTimeout))
{
DecrementWaiters();
return;
}
//*** Step 3, Yielding
//Sleep(1) every 50 yields
int yieldsoFar = 0;
while (true)
{
observedOwner = m_owner;
if ((observedOwner & LOCK_ANONYMOUS_OWNED) == LOCK_UNOWNED)
{
#if !FEATURE_CORECLR
Thread.BeginCriticalRegion();
#endif
int newOwner = (observedOwner & WAITERS_MASK) == 0 ? // Gets the number of waiters, if zero
observedOwner | 1 // don't decrement it. just set the lock bit, it is zzero because a previous call of Exit(false) ehich corrupted the waiters
: (observedOwner - 2) | 1; // otherwise decrement the waiters and set the lock bit
Contract.Assert((newOwner & WAITERS_MASK) >= 0);
#if PFX_LEGACY_3_5
if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner) == observedOwner)
{
lockTaken = true;
return;
}
#else
if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner, ref lockTaken) == observedOwner)
{
return;
}
#endif
#if !FEATURE_CORECLR
Thread.EndCriticalRegion();
#endif
}
if (yieldsoFar % SLEEP_ONE_FREQUENCY == 0)
{
Thread.Sleep(1);
}
else if (yieldsoFar % SLEEP_ZERO_FREQUENCY == 0)
{
Thread.Sleep(0);
}
else
{
#if PFX_LEGACY_3_5
Platform.Yield();
#else
Thread.Yield();
#endif
}
if (yieldsoFar % TIMEOUT_CHECK_FREQUENCY == 0)
{
//Check the timeout.
if (millisecondsTimeout != Timeout.Infinite && TimeoutExpired(startTicks, millisecondsTimeout))
{
DecrementWaiters();
return;
}
}
yieldsoFar++;
}
}
/// <summary>
/// decrements the waiters, in case of the timeout is expired
/// </summary>
private void DecrementWaiters()
{
SpinWait spinner = new SpinWait();
while (true)
{
int observedOwner = m_owner;
if ((observedOwner & WAITERS_MASK) == 0) return; // don't decrement the waiters if it's corrupted by previous call of Exit(false)
if (Interlocked.CompareExchange(ref m_owner, observedOwner - 2, observedOwner) == observedOwner)
{
Contract.Assert(!IsThreadOwnerTrackingEnabled); // Make sure the waiters never be negative which will cause the thread tracking bit to be flipped
break;
}
spinner.SpinOnce();
}
}
從代碼中發(fā)現(xiàn)SpinLock并不是我們簡單的實現(xiàn)那樣一直自旋,其內(nèi)部做了很多優(yōu)化。
1:內(nèi)部使用了Interlocked.CompareExchange保持原子操作, m_owner 0可用,1不可用。
2:第一次獲得鎖失敗后,繼續(xù)調(diào)用ContinueTryEnter,ContinueTryEnter有三種獲得鎖的情況。
3:ContinueTryEnter函數(shù)第一種獲得鎖的方式。 使用了while+SpinWait,后續(xù)再講。
4:第一種方式達到最大等待者數(shù)量后,命中走第二種。 繼續(xù)自旋 turn * 100次。100這個值是處理器核數(shù)(4, 8 ,16)下最好的。
5:第二種如果還不能獲得鎖,走第三種。 這種就有點混合構(gòu)造的意味了,如下:
if (yieldsoFar % 40 == 0)
Thread.Sleep(1);
else if (yieldsoFar % 10 == 0)
Thread.Sleep(0);
else
Thread.Yield();
Thread.Sleep(1) : 終止當(dāng)前線程,放棄剩下時間片 休眠1毫秒。 退出跟其他線程搶占cpu。當(dāng)然這個一般會更多,系統(tǒng)無法保證這么細(xì)的時間粒度。
Thread.Sleep(0): 終止當(dāng)前線程,放棄剩下時間片。 但立馬還會跟其他線程搶cpu,能不能搶到跟線程優(yōu)先級有關(guān)。
Thread.Yeild(): 結(jié)束當(dāng)前線程。讓出cpu給其他準(zhǔn)備好的線程。其他線程ok后或沒有準(zhǔn)備好的線程,繼續(xù)執(zhí)行。 跟優(yōu)先級無關(guān)。
Thread.Yeild()還會返回個bool值,是否讓出成功。
從源碼中,我們可以學(xué)到不少編程技巧。 比如我們也可以使用 自旋+Thread.Yeild() 或 while+Thread.Yeild() 等組合。
五:總結(jié)
本章談了自旋鎖的基礎(chǔ)+樓主的經(jīng)驗。 SpinLock類源碼這塊,只粗淺理解了下,并沒有深究。
測了下SpinLock和自己實現(xiàn)的自旋鎖性能對比(并行添加1000w List<int>()),SpinLock是單純的自旋鎖性能2倍以上。
還測了下lock的性能,是系統(tǒng)SpinLock性能的3倍以上。 可見lock內(nèi)部自旋的效率更高,CLR暫沒開源,所以看不到CLR具體實現(xiàn)的代碼。
相關(guān)文章
C#任務(wù)并行Parellel.For和Parallel.ForEach
這篇文章介紹了C#任務(wù)并行Parellel.For和Parallel.ForEach的用法,文中通過示例代碼介紹的非常詳細(xì)。對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-07-07