C#優(yōu)雅處理TCP數(shù)據(jù)的實(shí)現(xiàn)
前言
Tcp是一個(gè)面向連接的流數(shù)據(jù)傳輸協(xié)議,用人話說就是傳輸是一個(gè)已經(jīng)建立好連接的管道,數(shù)據(jù)都在管道里像流水一樣流淌到對(duì)端。
那么數(shù)據(jù)必然存在幾個(gè)問題,比如數(shù)據(jù)如何持續(xù)的讀取,數(shù)據(jù)包的邊界等。
Nagle's算法
Nagle 算法的核心思想是,在一個(gè) TCP 連接上,最多只能有一個(gè)未被確認(rèn)的小數(shù)據(jù)包(小于 MSS,即最大報(bào)文段大?。?/p>
優(yōu)勢(shì)
減少網(wǎng)絡(luò)擁塞:通過合并小數(shù)據(jù)包,減少了網(wǎng)絡(luò)中的數(shù)據(jù)包數(shù)量,降低了擁塞的可能性。
提高網(wǎng)絡(luò)效率:在低速網(wǎng)絡(luò)中,Nagle 算法可以顯著提高傳輸效率。
劣勢(shì)
增加延遲:在交互式應(yīng)用中,Nagle 算法可能導(dǎo)致顯著的延遲,因?yàn)樗却?ACK 或合并數(shù)據(jù)包。
C#中如何配置?
var _socket = new Socket(IPAddress.Any.AddressFamily, SocketType.Stream, ProtocolType.Tcp); _serverSocket.NoDelay = _options.NoDelay;
連接超時(shí)
在調(diào)用客戶端Socket連接服務(wù)器的時(shí)候,可以設(shè)置連接超時(shí)機(jī)制,具體可以傳入一個(gè)任務(wù)的取消令牌,并且設(shè)置超時(shí)時(shí)間。
CancellationTokenSource connectTokenSource = new CancellationTokenSource(); connectTokenSource.CancelAfter(3000); //3秒 await _socket.ConnectAsync(RemoteEndPoint, connectTokenSource.Token);
SSL加密傳輸
TCP使用SSL加密傳輸,通過非對(duì)稱加密的方式,利用證書,保證雙方使用了安全的密鑰加密了報(bào)文。在C#中如何配置?
服務(wù)端配置
//創(chuàng)建證書對(duì)象
var _certificate = _certificate = new X509Certificate2(_options.PfxCertFilename, _options.PfxPassword);
//與客戶端進(jìn)行驗(yàn)證
if (allowingUntrustedSSLCertificate) //是否允許不受信任的證書
{
SslStream = new SslStream(NetworkStream, false,
(obj, certificate, chain, error) => true);
}
else
{
SslStream = new SslStream(NetworkStream, false);
}
try
{
//serverCertificate:用于對(duì)服務(wù)器進(jìn)行身份驗(yàn)證的 X509Certificate
//clientCertificateRequired:一個(gè) Boolean 值,指定客戶端是否必須為身份驗(yàn)證提供證書
//checkCertificateRevocation:一個(gè) Boolean 值,指定在身份驗(yàn)證過程中是否檢查證書吊銷列表
await SslStream.AuthenticateAsServerAsync(new SslServerAuthenticationOptions()
{
ServerCertificate = x509Certificate,
ClientCertificateRequired = mutuallyAuthenticate,
CertificateRevocationCheckMode = checkCertificateRevocation ? X509RevocationMode.Online : X509RevocationMode.NoCheck
}, cancellationToken).ConfigureAwait(false);
if (!SslStream.IsEncrypted || !SslStream.IsAuthenticated)
{
returnfalse;
}
if (mutuallyAuthenticate && !SslStream.IsMutuallyAuthenticated)
{
returnfalse;
}
}
catch (Exception)
{
throw;
}
//完成驗(yàn)證后,通過SslStream傳輸數(shù)據(jù)
int readCount = await SslStream.ReadAsync(buffer, _lifecycleTokenSource.Token)
.ConfigureAwait(false);客戶端配置
var _certificate = new X509Certificate2(_options.PfxCertFilename, _options.PfxPassword);
if (_options.IsSsl) //如果使用ssl加密傳輸
{
if (_options.AllowingUntrustedSSLCertificate)//是否允許不受信任的證書
{
_sslStream = new SslStream(_networkStream, false,
(obj, certificate, chain, error) => true);
}
else
{
_sslStream = new SslStream(_networkStream, false);
}
_sslStream.ReadTimeout = _options.ReadTimeout;
_sslStream.WriteTimeout = _options.WriteTimeout;
await _sslStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions()
{
TargetHost = RemoteEndPoint.Address.ToString(),
EnabledSslProtocols = System.Security.Authentication.SslProtocols.Tls12,
CertificateRevocationCheckMode = _options.CheckCertificateRevocation ? X509RevocationMode.Online : X509RevocationMode.NoCheck,
ClientCertificates = new X509CertificateCollection() { _certificate }
}, connectTokenSource.Token).ConfigureAwait(false);
if (!_sslStream.IsEncrypted || !_sslStream.IsAuthenticated ||
(_options.MutuallyAuthenticate && !_sslStream.IsMutuallyAuthenticated))
{
thrownew InvalidOperationException("SSL authenticated faild!");
}
}KeepAlive
keepAlive不是TCP協(xié)議中的,而是各個(gè)操作系統(tǒng)本身實(shí)現(xiàn)的功能,主要是防止一些Socket突然斷開后沒有被感知到,導(dǎo)致一直浪費(fèi)資源的情況。
其基本原理是在此機(jī)制開啟時(shí),當(dāng)長(zhǎng)連接無數(shù)據(jù)交互一定時(shí)間間隔時(shí),連接的一方會(huì)向?qū)Ψ桨l(fā)送?;钐綔y(cè)包,如連接仍正常,對(duì)方將對(duì)此確認(rèn)回應(yīng)
C#中如何調(diào)用操作系統(tǒng)的KeepAlive?
/// <summary>
/// 開啟Socket的KeepAlive
/// 設(shè)置tcp協(xié)議的一些KeepAlive參數(shù)
/// </summary>
/// <param name="socket"></param>
/// <param name="tcpKeepAliveInterval">沒有接收到對(duì)方確認(rèn),繼續(xù)發(fā)送KeepAlive的發(fā)送頻率</param>
/// <param name="tcpKeepAliveTime">KeepAlive的空閑時(shí)長(zhǎng),或者說每次正常發(fā)送心跳的周期</param>
/// <param name="tcpKeepAliveRetryCount">KeepAlive之后設(shè)置最大允許發(fā)送?;钐綔y(cè)包的次數(shù),到達(dá)此次數(shù)后直接放棄嘗試,并關(guān)閉連接</param>
internal static void SetKeepAlive(this Socket socket, int tcpKeepAliveInterval, int tcpKeepAliveTime, int tcpKeepAliveRetryCount)
{
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, tcpKeepAliveInterval);
socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, tcpKeepAliveTime);
socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, tcpKeepAliveRetryCount);
}具體的開啟,還需要看操作系統(tǒng)的版本以及不同操作系統(tǒng)的支持。
粘包斷包處理
Pipe & ReadOnlySequence

上圖來自微軟官方博客:https://devblogs.microsoft.com/dotnet/system-io-pipelines-high-performance-io-in-net/
TCP面向應(yīng)用是流式數(shù)據(jù)傳輸,所以接收端接到的數(shù)據(jù)是像流水一樣從管道中傳來,每次取到的數(shù)據(jù)取決于應(yīng)用設(shè)置的緩沖區(qū)大小,以及套接字本身緩沖區(qū)待讀取字節(jié)數(shù)。
C#中提供的Pipe就如上圖一樣,是一個(gè)管道Pipe有兩個(gè)對(duì)象成員,一個(gè)是PipeWriter,一個(gè)是PipeReader,可以理解為一個(gè)是生產(chǎn)者,專門往管道里灌輸數(shù)據(jù)流,即字節(jié)流,一個(gè)是消費(fèi)者,專門從管道里獲取字節(jié)流進(jìn)行處理。
可以看到Pipe中的數(shù)據(jù)包是用鏈表關(guān)聯(lián)的,但是這個(gè)數(shù)據(jù)包是從Socke緩沖區(qū)每次取到的數(shù)據(jù)包,它不一定是一個(gè)完整的數(shù)據(jù)包,所以這些數(shù)據(jù)包連接起來后形成了一個(gè)C#提供的另外一個(gè)抽象的對(duì)象ReadOnlySequence。
但是這里還是沒有提供太好的處理斷包和粘包的辦法,因?yàn)閿喟嘲奶幚硇枰獌煞矫?/p>
1、業(yè)務(wù)數(shù)據(jù)包的定義
2、數(shù)據(jù)流切割出一個(gè)個(gè)完整的數(shù)據(jù)包
假設(shè)業(yè)務(wù)已經(jīng)定義好了數(shù)據(jù)包,那么我們?nèi)绾螐腜ipe中這些數(shù)據(jù)包根據(jù)業(yè)務(wù)定義來從不同的數(shù)據(jù)包中切割出一個(gè)完整的包,那么就需要ReadOnlySequence,它提供的操作方法,非常方便我們?nèi)デ懈顢?shù)據(jù),主要是頭尾數(shù)據(jù)包的切割。
假設(shè)我們業(yè)務(wù)層定義了一個(gè)數(shù)據(jù)包結(jié)構(gòu),數(shù)據(jù)包是不定長(zhǎng)的,包體長(zhǎng)度每次都寫在包頭里,我們來實(shí)現(xiàn)一個(gè)數(shù)據(jù)包過濾器。
//收到消息
while (!_receiveDataTokenSource.Token.IsCancellationRequested)
{
try
{
//從pipe中獲取緩沖區(qū)
Memory<byte> buffer = _pipeWriter.GetMemory(_options.BufferSize);
int readCount = 0;
readCount = await _sslStream.ReadAsync(buffer, _lifecycleTokenSource.Token).ConfigureAwait(false);
if (readCount > 0)
{
var data = buffer.Slice(0, readCount);
//告知消費(fèi)者,往Pipe的管道中寫入了多少字節(jié)數(shù)據(jù)
_pipeWriter.Advance(readCount);
}
else
{
if (IsDisconnect())
{
await DisConnectAsync();
}
thrownew SocketException();
}
FlushResult result = await _pipeWriter.FlushAsync().ConfigureAwait(false);
if (result.IsCompleted)
{
break;
}
}
catch (IOException)
{
//TODO log
break;
}
catch (SocketException)
{
//TODO log
break;
}
catch (TaskCanceledException)
{
//TODO log
break;
}
}
_pipeWriter.Complete();
//消費(fèi)者處理數(shù)據(jù)
while (!_lifecycleTokenSource.Token.IsCancellationRequested)
{
ReadResult result = await _pipeReader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
ReadOnlySequence<byte> data;
do
{
//通過過濾器得到一個(gè)完整的包
data = _receivePackageFilter.ResolvePackage(ref buffer);
if (!data.IsEmpty)
{
OnReceivedData?.Invoke(this, new ClientDataReceiveEventArgs(data.ToArray()));
}
}
while (!data.IsEmpty && buffer.Length > 0);
_pipeReader.AdvanceTo(buffer.Start);
}
_pipeReader.Complete();
/// <summary>
/// 解析數(shù)據(jù)包
/// 固定報(bào)文頭解析協(xié)議
/// </summary>
/// <param name="headerSize">數(shù)據(jù)報(bào)文頭的大小</param>
/// <param name="bodyLengthIndex">數(shù)據(jù)包大小在報(bào)文頭中的位置</param>
/// <param name="bodyLengthBytes">數(shù)據(jù)包大小在報(bào)文頭中的長(zhǎng)度</param>
/// <param name="IsLittleEndian">數(shù)據(jù)報(bào)文大小端。windows中通常是小端,unix通常是大端模式</param>
/// </summary>
/// <param name="sequence">一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包</param>
public override ReadOnlySequence<byte> ResolvePackage(ref ReadOnlySequence<byte> sequence)
{
var len = sequence.Length;
if (len < _bodyLengthIndex) returndefault;
var bodyLengthSequence = sequence.Slice(_bodyLengthIndex, _bodyLengthBytes);
byte[] bodyLengthBytes = ArrayPool<byte>.Shared.Rent(_bodyLengthBytes);
try
{
int index = 0;
foreach (var item in bodyLengthSequence)
{
Array.Copy(item.ToArray(), 0, bodyLengthBytes, index, item.Length);
index += item.Length;
}
long bodyLength = 0;
int offset = 0;
if (!_isLittleEndian)
{
offset = bodyLengthBytes.Length - 1;
foreach (var bytes in bodyLengthBytes)
{
bodyLength += bytes << (offset * 8);
offset--;
}
}
else
{
foreach (var bytes in bodyLengthBytes)
{
bodyLength += bytes << (offset * 8);
offset++;
}
}
if (sequence.Length < _headerSize + bodyLength)
returndefault;
var endPosition = sequence.GetPosition(_headerSize + bodyLength);
var data = sequence.Slice(0, endPosition);//得到完整數(shù)據(jù)包
sequence = sequence.Slice(endPosition);//緩沖區(qū)中去除取到的完整包
return data;
}
finally
{
ArrayPool<byte>.Shared.Return(bodyLengthBytes);
}
}以上就是實(shí)現(xiàn)了固定數(shù)據(jù)包頭實(shí)現(xiàn)粘包斷包處理的部分代碼。
關(guān)于TCP的連接還有一些,比如客戶端連接限制,空閑連接關(guān)閉等。
如果大家對(duì)于完整代碼感興趣,可以看我剛寫的一個(gè)TCP庫(kù)
EasyTcp4Net
https://github.com/BruceQiu1996/EasyTcp4Net



到此這篇關(guān)于C#優(yōu)雅處理TCP數(shù)據(jù)的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)C# TCP數(shù)據(jù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C#并行編程之?dāng)?shù)據(jù)并行Tasks.Parallel類
這篇文章介紹了C#并行編程之?dāng)?shù)據(jù)并行Tasks.Parallel類,文中通過示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-05-05
c#實(shí)現(xiàn)斷點(diǎn)續(xù)傳功能示例分享
這篇文章主要介紹了c#實(shí)現(xiàn)的斷點(diǎn)續(xù)傳功能示例,斷點(diǎn)續(xù)傳就是在上一次下載時(shí)斷開的位置開始繼續(xù)下載。在HTTP協(xié)議中,可以在請(qǐng)求報(bào)文頭中加入Range段,來表示客戶機(jī)希望從何處繼續(xù)下載,下面是示例,需要的朋友可以參考下2014-03-03
C#使用反射(Reflect)獲取dll文件中的類型并調(diào)用方法
這篇文章主要為大家詳細(xì)介紹了C#使用反射(Reflect)獲取dll文件中的類型并調(diào)用方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-10-10
C#實(shí)現(xiàn)12306自動(dòng)登錄的方法
本文介紹了C#實(shí)現(xiàn)12306自動(dòng)登錄的方法,主要方法是捕獲參數(shù)和url并補(bǔ)充參數(shù),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2015-07-07
c#同步兩個(gè)子目錄文件示例分享 兩個(gè)文件夾同步
這篇文章主要介紹了使用c#同步兩個(gè)子目錄文件的方法,大家參考使用吧2014-01-01

