ASP.NET Core 3.x 并發(fā)限制的實(shí)現(xiàn)代碼
前言
Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0后增加的,用于傳入的請(qǐng)求進(jìn)行排隊(duì)處理,避免線程池的不足.
我們?nèi)粘i_發(fā)中可能常做的給某web服務(wù)器配置連接數(shù)以及,請(qǐng)求隊(duì)列大小,那么今天我們看看如何在通過中間件形式實(shí)現(xiàn)一個(gè)并發(fā)量以及隊(duì)列長(zhǎng)度限制.
Queue策略
添加Nuget
Install-Package Microsoft.AspNetCore.ConcurrencyLimiter
public void ConfigureServices(IServiceCollection services)
{
services.AddQueuePolicy(options =>
{
//最大并發(fā)請(qǐng)求數(shù)
options.MaxConcurrentRequests = 2;
//請(qǐng)求隊(duì)列長(zhǎng)度限制
options.RequestQueueLimit = 1;
});
services.AddControllers();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
//添加并發(fā)限制中間件
app.UseConcurrencyLimiter();
app.Run(async context =>
{
Task.Delay(100).Wait(); // 100ms sync-over-async
await context.Response.WriteAsync("Hello World!");
});
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseHttpsRedirection();
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
通過上面簡(jiǎn)單的配置,我們就可以將他引入到我們的代碼中,從而做并發(fā)量限制,以及隊(duì)列的長(zhǎng)度;那么問題來了,他是怎么實(shí)現(xiàn)的呢?
public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
{
services.Configure(configure);
services.AddSingleton<IQueuePolicy, QueuePolicy>();
return services;
}
QueuePolicy采用的是SemaphoreSlim信號(hào)量設(shè)計(jì),SemaphoreSlim、Semaphore(信號(hào)量)支持并發(fā)多線程進(jìn)入被保護(hù)代碼,對(duì)象在初始化時(shí)會(huì)指定 最大任務(wù)數(shù)量,當(dāng)線程請(qǐng)求訪問資源,信號(hào)量遞減,而當(dāng)他們釋放時(shí),信號(hào)量計(jì)數(shù)又遞增。
/// <summary>
/// 構(gòu)造方法(初始化Queue策略)
/// </summary>
/// <param name="options"></param>
public QueuePolicy(IOptions<QueuePolicyOptions> options)
{
_maxConcurrentRequests = options.Value.MaxConcurrentRequests;
if (_maxConcurrentRequests <= 0)
{
throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer.");
}
_requestQueueLimit = options.Value.RequestQueueLimit;
if (_requestQueueLimit < 0)
{
throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");
}
//使用SemaphoreSlim來限制任務(wù)最大個(gè)數(shù)
_serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
}
ConcurrencyLimiterMiddleware中間件
/// <summary>
/// Invokes the logic of the middleware.
/// </summary>
/// <param name="context">The <see cref="HttpContext"/>.</param>
/// <returns>A <see cref="Task"/> that completes when the request leaves.</returns>
public async Task Invoke(HttpContext context)
{
var waitInQueueTask = _queuePolicy.TryEnterAsync();
// Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
bool result;
if (waitInQueueTask.IsCompleted)
{
ConcurrencyLimiterEventSource.Log.QueueSkipped();
result = waitInQueueTask.Result;
}
else
{
using (ConcurrencyLimiterEventSource.Log.QueueTimer())
{
result = await waitInQueueTask;
}
}
if (result)
{
try
{
await _next(context);
}
finally
{
_queuePolicy.OnExit();
}
}
else
{
ConcurrencyLimiterEventSource.Log.RequestRejected();
ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
await _onRejected(context);
}
}
每次當(dāng)我們請(qǐng)求的時(shí)候首先會(huì)調(diào)用_queuePolicy.TryEnterAsync(),進(jìn)入該方法后先開啟一個(gè)私有l(wèi)ock鎖,再接著判斷總請(qǐng)求量是否≥(請(qǐng)求隊(duì)列限制的大小+最大并發(fā)請(qǐng)求數(shù)),如果當(dāng)前數(shù)量超出了,那么我直接拋出,送你個(gè)503狀態(tài);
if (result)
{
try
{
await _next(context);
}
finally
{
_queuePolicy.OnExit();
}
}
else
{
ConcurrencyLimiterEventSource.Log.RequestRejected();
ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
await _onRejected(context);
}
問題來了,我這邊如果說還沒到你設(shè)置的大小呢,我這個(gè)請(qǐng)求沒有給你服務(wù)器造不成壓力,那么你給我處理一下吧.
await _serverSemaphore.WaitAsync();異步等待進(jìn)入信號(hào)量,如果沒有線程被授予對(duì)信號(hào)量的訪問權(quán)限,則進(jìn)入執(zhí)行保護(hù)代碼;否則此線程將在此處等待,直到信號(hào)量被釋放為止
lock (_totalRequestsLock)
{
if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
{
return false;
}
TotalRequests++;
}
//異步等待進(jìn)入信號(hào)量,如果沒有線程被授予對(duì)信號(hào)量的訪問權(quán)限,則進(jìn)入執(zhí)行保護(hù)代碼;否則此線程將在此處等待,直到信號(hào)量被釋放為止
await _serverSemaphore.WaitAsync();
return true;
}
返回成功后那么中間件這邊再進(jìn)行處理,_queuePolicy.OnExit();通過該調(diào)用進(jìn)行調(diào)用_serverSemaphore.Release();釋放信號(hào)燈,再對(duì)總請(qǐng)求數(shù)遞減
Stack策略
再來看看另一種方法,棧策略,他是怎么做的呢?一起來看看.再附加上如何使用的代碼.
public void ConfigureServices(IServiceCollection services)
{
services.AddStackPolicy(options =>
{
//最大并發(fā)請(qǐng)求數(shù)
options.MaxConcurrentRequests = 2;
//請(qǐng)求隊(duì)列長(zhǎng)度限制
options.RequestQueueLimit = 1;
});
services.AddControllers();
}
通過上面的配置,我們便可以對(duì)我們的應(yīng)用程序執(zhí)行出相應(yīng)的策略.下面再來看看他是怎么實(shí)現(xiàn)的呢
public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
{
services.Configure(configure);
services.AddSingleton<IQueuePolicy, StackPolicy>();
return services;
}
可以看到這次是通過StackPolicy類做的策略.來一起來看看主要的方法
/// <summary>
/// 構(gòu)造方法(初始化參數(shù))
/// </summary>
/// <param name="options"></param>
public StackPolicy(IOptions<QueuePolicyOptions> options)
{
//棧分配
_buffer = new List<ResettableBooleanCompletionSource>();
//隊(duì)列大小
_maxQueueCapacity = options.Value.RequestQueueLimit;
//最大并發(fā)請(qǐng)求數(shù)
_maxConcurrentRequests = options.Value.MaxConcurrentRequests;
//剩余可用空間
_freeServerSpots = options.Value.MaxConcurrentRequests;
}
當(dāng)我們通過中間件請(qǐng)求調(diào)用,_queuePolicy.TryEnterAsync()時(shí),首先會(huì)判斷我們是否還有訪問請(qǐng)求次數(shù),如果_freeServerSpots>0,那么則直接給我們返回true,讓中間件直接去執(zhí)行下一步,如果當(dāng)前隊(duì)列=我們?cè)O(shè)置的隊(duì)列大小的話,那我們需要取消先前請(qǐng)求;每次取消都是先取消之前的保留后面的請(qǐng)求;
public ValueTask<bool> TryEnterAsync()
{
lock (_bufferLock)
{
if (_freeServerSpots > 0)
{
_freeServerSpots--;
return _trueTask;
}
// 如果隊(duì)列滿了,取消先前的請(qǐng)求
if (_queueLength == _maxQueueCapacity)
{
_hasReachedCapacity = true;
_buffer[_head].Complete(false);
_queueLength--;
}
var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);
_cachedResettableTCS = null;
if (_hasReachedCapacity || _queueLength < _buffer.Count)
{
_buffer[_head] = tcs;
}
else
{
_buffer.Add(tcs);
}
_queueLength++;
// increment _head for next time
_head++;
if (_head == _maxQueueCapacity)
{
_head = 0;
}
return tcs.GetValueTask();
}
}
當(dāng)我們請(qǐng)求后調(diào)用_queuePolicy.OnExit();出棧,再將請(qǐng)求長(zhǎng)度遞減
public void OnExit()
{
lock (_bufferLock)
{
if (_queueLength == 0)
{
_freeServerSpots++;
if (_freeServerSpots > _maxConcurrentRequests)
{
_freeServerSpots--;
throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
}
return;
}
// step backwards and launch a new task
if (_head == 0)
{
_head = _maxQueueCapacity - 1;
}
else
{
_head--;
}
//退出,出棧
_buffer[_head].Complete(true);
_queueLength--;
}
}
總結(jié)
基于棧結(jié)構(gòu)的特點(diǎn),在實(shí)際應(yīng)用中,通常只會(huì)對(duì)棧執(zhí)行以下兩種操作:
- 向棧中添加元素,此過程被稱為"進(jìn)棧"(入棧或壓棧);
- 從棧中提取出指定元素,此過程被稱為"出棧"(或彈棧);
隊(duì)列存儲(chǔ)結(jié)構(gòu)的實(shí)現(xiàn)有以下兩種方式:
- 順序隊(duì)列:在順序表的基礎(chǔ)上實(shí)現(xiàn)的隊(duì)列結(jié)構(gòu);
- 鏈隊(duì)列:在鏈表的基礎(chǔ)上實(shí)現(xiàn)的隊(duì)列結(jié)構(gòu);
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
在IIS上重新注冊(cè).NET Framework 2.0的命令和參數(shù)詳解
這篇文章主要介紹了在IIS上重新注冊(cè).NET Framework 2.0的命令和參數(shù)詳解,但其它.NET Framework 版本沒有測(cè)試,需要的朋友可以參考下2014-07-07
asp.net下實(shí)現(xiàn)輸入數(shù)字的冒泡排序
.net下實(shí)現(xiàn)輸入數(shù)字的冒泡排序2010-03-03
C# javaScript函數(shù)的相互調(diào)用
如何在JavaScript訪問C#函數(shù),如何在C#中訪問JavaScript的已有變量等實(shí)現(xiàn)方法2008-12-12
WPF使用代碼創(chuàng)建數(shù)據(jù)模板DataTemplate
本文詳細(xì)講解了WPF使用代碼創(chuàng)建數(shù)據(jù)模板DataTemplate的方法,文中通過示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-02-02
.net生成縮略圖及水印圖片時(shí)出現(xiàn)GDI+中發(fā)生一般性錯(cuò)誤解決方法
這篇文章主要介紹了.net生成縮略圖及水印圖片時(shí)出現(xiàn)GDI+中發(fā)生一般性錯(cuò)誤解決方法 ,需要的朋友可以參考下2014-11-11
asp.net core項(xiàng)目授權(quán)流程詳解
本文詳細(xì)講解了asp.net core項(xiàng)目的授權(quán)流程,文中通過示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-09-09

