C#中一個(gè)高性能異步socket封裝庫的實(shí)現(xiàn)思路分享
前言
socket是軟件之間通訊最常用的一種方式。c#實(shí)現(xiàn)socket通訊有很多中方法,其中效率最高就是異步通訊。
異步通訊實(shí)際是利用windows完成端口(IOCP)來處理的,關(guān)于完成端口實(shí)現(xiàn)原理,大家可以參考網(wǎng)上文章。
我這里想強(qiáng)調(diào)的是采用完成端口機(jī)制的異步通訊是windows下效率最高的通訊方式,沒有之一!
異步通訊比同步通訊處理要難很多,代碼編寫中會(huì)遇到許多“坑“。如果沒有經(jīng)驗(yàn),很難完成。
我搜集了大量資料,完成了對異步socket的封裝。此庫已用穩(wěn)定高效的運(yùn)行幾個(gè)月。
縱觀網(wǎng)上的資料,我還沒有遇到一個(gè)滿意的封裝庫。許多文章把數(shù)據(jù)收發(fā)和協(xié)議處理雜糅在一塊,代碼非常難懂,也無法擴(kuò)展。
在編寫該庫時(shí),避免以上缺陷。將邏輯處理層次化,模塊化!同時(shí)實(shí)現(xiàn)了高可用性與高性能。
為了使大家對通訊效率有初步了解,先看測試圖。

主機(jī)配置情況

百兆帶寬基本占滿,cpu占用40%,我的電腦在空閑時(shí),cpu占用大概20%,也就是說程序占用cpu 20%左右。
這個(gè)庫是可擴(kuò)展的,就是說即使10萬個(gè)連接,收發(fā)同樣的數(shù)據(jù),cpu占用基本相同。
庫的結(jié)構(gòu)圖

目標(biāo)
即可作為服務(wù)端(監(jiān)聽)也可以作為客戶端(主動(dòng)連接)使用。
可以適應(yīng)任何網(wǎng)絡(luò)協(xié)議。收發(fā)的數(shù)據(jù)針對字節(jié)流或一個(gè)完整的包。對協(xié)議內(nèi)容不做處理。
高可用性。將復(fù)雜的底層處理封裝,對外接口非常友好。
高性能。最大限度優(yōu)化處理。單機(jī)可支持?jǐn)?shù)萬連接,收發(fā)速度可達(dá)幾百兆bit。
實(shí)現(xiàn)思路
網(wǎng)絡(luò)處理邏輯可以分為以下幾個(gè)部分:
網(wǎng)絡(luò)監(jiān)聽 可以在多個(gè)端口實(shí)現(xiàn)監(jiān)聽。負(fù)責(zé)生成socket,生成的socket供后續(xù)處理。監(jiān)聽模塊功能比較單一,如有必要,可對監(jiān)聽模塊做進(jìn)一步優(yōu)化。
主動(dòng)連接 可以異步或同步的連接對方。連接成功后,對socket的后續(xù)處理,與監(jiān)聽得到的socket完全一樣。注:無論是監(jiān)聽得到的socket,還是連接得到的socket,后續(xù)處理完全一樣。
Socket收發(fā)處理 每個(gè)socket對應(yīng)一個(gè)收發(fā)實(shí)例,socket收發(fā)只針對字節(jié)流處理。收發(fā)時(shí),做了優(yōu)化。比如發(fā)送時(shí),對數(shù)據(jù)做了沾包,提高發(fā)送性能;接收時(shí),一次投遞1K的數(shù)據(jù)。
組包處理 一般數(shù)據(jù)包都有包長度指示;比如 報(bào)頭的前倆個(gè)字節(jié)表示長度,根據(jù)這個(gè)值就可以組成一個(gè)完整的包。
NetListener 監(jiān)聽
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace IocpCore
{
class NetListener
{
private Socket listenSocket;
public ListenParam _listenParam { get; set; }
public event Action<ListenParam, AsyncSocketClient> OnAcceptSocket;
bool start;
NetServer _netServer;
public NetListener(NetServer netServer)
{
_netServer = netServer;
}
public int _acceptAsyncCount = 0;
public bool StartListen()
{
try
{
start = true;
IPEndPoint listenPoint = new IPEndPoint(IPAddress.Parse("0.0.0.0"), _listenParam._port);
listenSocket = new Socket(listenPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
listenSocket.Bind(listenPoint);
listenSocket.Listen(200);
Thread thread1 = new Thread(new ThreadStart(NetProcess));
thread1.Start();
StartAccept();
return true;
}
catch (Exception ex)
{
NetLogger.Log(string.Format("**監(jiān)聽異常!{0}", ex.Message));
return false;
}
}
AutoResetEvent _acceptEvent = new AutoResetEvent(false);
private void NetProcess()
{
while (start)
{
DealNewAccept();
_acceptEvent.WaitOne(1000 * 10);
}
}
private void DealNewAccept()
{
try
{
if(_acceptAsyncCount <= 10)
{
StartAccept();
}
while (true)
{
AsyncSocketClient client = _newSocketClientList.GetObj();
if (client == null)
break;
DealNewAccept(client);
}
}
catch (Exception ex)
{
NetLogger.Log(string.Format("DealNewAccept 異常 {0}***{1}", ex.Message, ex.StackTrace));
}
}
private void DealNewAccept(AsyncSocketClient client)
{
client.SendBufferByteCount = _netServer.SendBufferBytePerClient;
OnAcceptSocket?.Invoke(_listenParam, client);
}
private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs)
{
try
{
Interlocked.Decrement(ref _acceptAsyncCount);
_acceptEvent.Set();
acceptEventArgs.Completed -= AcceptEventArg_Completed;
ProcessAccept(acceptEventArgs);
}
catch (Exception ex)
{
NetLogger.Log(string.Format("AcceptEventArg_Completed {0}***{1}", ex.Message, ex.StackTrace));
}
}
public bool StartAccept()
{
SocketAsyncEventArgs acceptEventArgs = new SocketAsyncEventArgs();
acceptEventArgs.Completed += AcceptEventArg_Completed;
bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs);
Interlocked.Increment(ref _acceptAsyncCount);
if (!willRaiseEvent)
{
Interlocked.Decrement(ref _acceptAsyncCount);
_acceptEvent.Set();
acceptEventArgs.Completed -= AcceptEventArg_Completed;
ProcessAccept(acceptEventArgs);
}
return true;
}
ObjectPool<AsyncSocketClient> _newSocketClientList = new ObjectPool<AsyncSocketClient>();
private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)
{
try
{
using (acceptEventArgs)
{
if (acceptEventArgs.AcceptSocket != null)
{
AsyncSocketClient client = new AsyncSocketClient(acceptEventArgs.AcceptSocket);
client.CreateClientInfo(this);
_newSocketClientList.PutObj(client);
_acceptEvent.Set();
}
}
}
catch (Exception ex)
{
NetLogger.Log(string.Format("ProcessAccept {0}***{1}", ex.Message, ex.StackTrace));
}
}
}
}
NetConnectManage連接處理
using System;
using System.Net;
using System.Net.Sockets;
namespace IocpCore
{
class NetConnectManage
{
public event Action<SocketEventParam, AsyncSocketClient> OnSocketConnectEvent;
public bool ConnectAsyn(string peerIp, int peerPort, object tag)
{
try
{
Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
SocketAsyncEventArgs socketEventArgs = new SocketAsyncEventArgs();
socketEventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);
socketEventArgs.Completed += SocketConnect_Completed;
SocketClientInfo clientInfo = new SocketClientInfo();
socketEventArgs.UserToken = clientInfo;
clientInfo.PeerIp = peerIp;
clientInfo.PeerPort = peerPort;
clientInfo.Tag = tag;
bool willRaiseEvent = socket.ConnectAsync(socketEventArgs);
if (!willRaiseEvent)
{
ProcessConnect(socketEventArgs);
socketEventArgs.Completed -= SocketConnect_Completed;
socketEventArgs.Dispose();
}
return true;
}
catch (Exception ex)
{
NetLogger.Log("ConnectAsyn",ex);
return false;
}
}
private void SocketConnect_Completed(object sender, SocketAsyncEventArgs socketEventArgs)
{
ProcessConnect(socketEventArgs);
socketEventArgs.Completed -= SocketConnect_Completed;
socketEventArgs.Dispose();
}
private void ProcessConnect(SocketAsyncEventArgs socketEventArgs)
{
SocketClientInfo clientInfo = socketEventArgs.UserToken as SocketClientInfo;
if (socketEventArgs.SocketError == SocketError.Success)
{
DealConnectSocket(socketEventArgs.ConnectSocket, clientInfo);
}
else
{
SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, null);
socketParam.ClientInfo = clientInfo;
OnSocketConnectEvent?.Invoke(socketParam, null);
}
}
void DealConnectSocket(Socket socket, SocketClientInfo clientInfo)
{
clientInfo.SetClientInfo(socket);
AsyncSocketClient client = new AsyncSocketClient(socket);
client.SetClientInfo(clientInfo);
//觸發(fā)事件
SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, socket);
socketParam.ClientInfo = clientInfo;
OnSocketConnectEvent?.Invoke(socketParam, client);
}
public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
{
socket = null;
try
{
Socket socketTmp = new Socket(SocketType.Stream, ProtocolType.Tcp);
SocketClientInfo clientInfo = new SocketClientInfo();
clientInfo.PeerIp = peerIp;
clientInfo.PeerPort = peerPort;
clientInfo.Tag = tag;
EndPoint remoteEP = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);
socketTmp.Connect(remoteEP);
if (!socketTmp.Connected)
return false;
DealConnectSocket(socketTmp, clientInfo);
socket = socketTmp;
return true;
}
catch (Exception ex)
{
NetLogger.Log(string.Format("連接對方:({0}:{1})出錯(cuò)!", peerIp, peerPort), ex);
return false;
}
}
}
}
AsyncSocketClient socket收發(fā)處理
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
namespace IocpCore
{
public class AsyncSocketClient
{
public static int IocpReadLen = 1024;
public readonly Socket ConnectSocket;
protected SocketAsyncEventArgs m_receiveEventArgs;
public SocketAsyncEventArgs ReceiveEventArgs { get { return m_receiveEventArgs; } set { m_receiveEventArgs = value; } }
protected byte[] m_asyncReceiveBuffer;
protected SocketAsyncEventArgs m_sendEventArgs;
public SocketAsyncEventArgs SendEventArgs { get { return m_sendEventArgs; } set { m_sendEventArgs = value; } }
protected byte[] m_asyncSendBuffer;
public event Action<AsyncSocketClient, byte[]> OnReadData;
public event Action<AsyncSocketClient, int> OnSendData;
public event Action<AsyncSocketClient> OnSocketClose;
static object releaseLock = new object();
public static int createCount = 0;
public static int releaseCount = 0;
~AsyncSocketClient()
{
lock (releaseLock)
{
releaseCount++;
}
}
public AsyncSocketClient(Socket socket)
{
lock (releaseLock)
{
createCount++;
}
ConnectSocket = socket;
m_receiveEventArgs = new SocketAsyncEventArgs();
m_asyncReceiveBuffer = new byte[IocpReadLen];
m_receiveEventArgs.AcceptSocket = ConnectSocket;
m_receiveEventArgs.Completed += ReceiveEventArgs_Completed;
m_sendEventArgs = new SocketAsyncEventArgs();
m_asyncSendBuffer = new byte[IocpReadLen * 2];
m_sendEventArgs.AcceptSocket = ConnectSocket;
m_sendEventArgs.Completed += SendEventArgs_Completed;
}
SocketClientInfo _clientInfo;
public SocketClientInfo ClientInfo
{
get
{
return _clientInfo;
}
}
internal void CreateClientInfo(NetListener netListener)
{
_clientInfo = new SocketClientInfo();
try
{
_clientInfo.Tag = netListener._listenParam._tag;
IPEndPoint ip = ConnectSocket.LocalEndPoint as IPEndPoint;
Debug.Assert(netListener._listenParam._port == ip.Port);
_clientInfo.LocalIp = ip.Address.ToString();
_clientInfo.LocalPort = netListener._listenParam._port;
ip = ConnectSocket.RemoteEndPoint as IPEndPoint;
_clientInfo.PeerIp = ip.Address.ToString();
_clientInfo.PeerPort = ip.Port;
}
catch (Exception ex)
{
NetLogger.Log("CreateClientInfo", ex);
}
}
internal void SetClientInfo(SocketClientInfo clientInfo)
{
_clientInfo = clientInfo;
}
#region read process
bool _inReadPending = false;
public EN_SocketReadResult ReadNextData()
{
lock (this)
{
if (_socketError)
return EN_SocketReadResult.ReadError;
if (_inReadPending)
return EN_SocketReadResult.InAsyn;
if(!ConnectSocket.Connected)
{
OnReadError();
return EN_SocketReadResult.ReadError;
}
try
{
m_receiveEventArgs.SetBuffer(m_asyncReceiveBuffer, 0, m_asyncReceiveBuffer.Length);
_inReadPending = true;
bool willRaiseEvent = ConnectSocket.ReceiveAsync(ReceiveEventArgs); //投遞接收請求
if (!willRaiseEvent)
{
_inReadPending = false;
ProcessReceive();
if (_socketError)
{
OnReadError();
return EN_SocketReadResult.ReadError;
}
return EN_SocketReadResult.HaveRead;
}
else
{
return EN_SocketReadResult.InAsyn;
}
}
catch (Exception ex)
{
NetLogger.Log("ReadNextData", ex);
_inReadPending = false;
OnReadError();
return EN_SocketReadResult.ReadError;
}
}
}
private void ProcessReceive()
{
if (ReceiveEventArgs.BytesTransferred > 0
&& ReceiveEventArgs.SocketError == SocketError.Success)
{
int offset = ReceiveEventArgs.Offset;
int count = ReceiveEventArgs.BytesTransferred;
byte[] readData = new byte[count];
Array.Copy(m_asyncReceiveBuffer, offset, readData, 0, count);
_inReadPending = false;
if (!_socketError)
OnReadData?.Invoke(this, readData);
}
else
{
_inReadPending = false;
OnReadError();
}
}
private void ReceiveEventArgs_Completed(object sender, SocketAsyncEventArgs e)
{
lock (this)
{
_inReadPending = false;
ProcessReceive();
if (_socketError)
{
OnReadError();
}
}
}
bool _socketError = false;
private void OnReadError()
{
lock (this)
{
if (_socketError == false)
{
_socketError = true;
OnSocketClose?.Invoke(this);
}
CloseClient();
}
}
#endregion
#region send process
int _sendBufferByteCount = 102400;
public int SendBufferByteCount
{
get
{
return _sendBufferByteCount;
}
set
{
if (value < 1024)
{
_sendBufferByteCount = 1024;
}
else
{
_sendBufferByteCount = value;
}
}
}
SendBufferPool _sendDataPool = new SendBufferPool();
internal EN_SendDataResult PutSendData(byte[] data)
{
if (_socketError)
return EN_SendDataResult.no_client;
if (_sendDataPool._bufferByteCount >= _sendBufferByteCount)
{
return EN_SendDataResult.buffer_overflow;
}
if (data.Length <= IocpReadLen)
{
_sendDataPool.PutObj(data);
}
else
{
List<byte[]> dataItems = SplitData(data, IocpReadLen);
foreach (byte[] item in dataItems)
{
_sendDataPool.PutObj(item);
}
}
return EN_SendDataResult.ok;
}
bool _inSendPending = false;
public EN_SocketSendResult SendNextData()
{
lock (this)
{
if (_socketError)
{
return EN_SocketSendResult.SendError;
}
if (_inSendPending)
{
return EN_SocketSendResult.InAsyn;
}
int sendByteCount = GetSendData();
if (sendByteCount == 0)
{
return EN_SocketSendResult.NoSendData;
}
//防止拋出異常,否則影響性能
if (!ConnectSocket.Connected)
{
OnSendError();
return EN_SocketSendResult.SendError;
}
try
{
m_sendEventArgs.SetBuffer(m_asyncSendBuffer, 0, sendByteCount);
_inSendPending = true;
bool willRaiseEvent = ConnectSocket.SendAsync(m_sendEventArgs);
if (!willRaiseEvent)
{
_inSendPending = false;
ProcessSend(m_sendEventArgs);
if (_socketError)
{
OnSendError();
return EN_SocketSendResult.SendError;
}
else
{
OnSendData?.Invoke(this, sendByteCount);
//繼續(xù)發(fā)下一條
return EN_SocketSendResult.HaveSend;
}
}
else
{
return EN_SocketSendResult.InAsyn;
}
}
catch (Exception ex)
{
NetLogger.Log("SendNextData", ex);
_inSendPending = false;
OnSendError();
return EN_SocketSendResult.SendError;
}
}
}
private void SendEventArgs_Completed(object sender, SocketAsyncEventArgs sendEventArgs)
{
lock (this)
{
try
{
_inSendPending = false;
ProcessSend(m_sendEventArgs);
int sendCount = 0;
if (sendEventArgs.SocketError == SocketError.Success)
{
sendCount = sendEventArgs.BytesTransferred;
}
OnSendData?.Invoke(this, sendCount);
if (_socketError)
{
OnSendError();
}
}
catch (Exception ex)
{
NetLogger.Log("SendEventArgs_Completed", ex);
}
}
}
private bool ProcessSend(SocketAsyncEventArgs sendEventArgs)
{
if (sendEventArgs.SocketError == SocketError.Success)
{
return true;
}
else
{
OnSendError();
return false;
}
}
private int GetSendData()
{
int dataLen = 0;
while (true)
{
byte[] data = _sendDataPool.GetObj();
if (data == null)
return dataLen;
Array.Copy(data, 0, m_asyncSendBuffer, dataLen, data.Length);
dataLen += data.Length;
if (dataLen > IocpReadLen)
break;
}
return dataLen;
}
private void OnSendError()
{
lock (this)
{
if (_socketError == false)
{
_socketError = true;
OnSocketClose?.Invoke(this);
}
CloseClient();
}
}
#endregion
internal void CloseSocket()
{
try
{
ConnectSocket.Close();
}
catch (Exception ex)
{
NetLogger.Log("CloseSocket", ex);
}
}
static object socketCloseLock = new object();
public static int closeSendCount = 0;
public static int closeReadCount = 0;
bool _disposeSend = false;
void CloseSend()
{
if (!_disposeSend && !_inSendPending)
{
lock (socketCloseLock)
closeSendCount++;
_disposeSend = true;
m_sendEventArgs.SetBuffer(null, 0, 0);
m_sendEventArgs.Completed -= SendEventArgs_Completed;
m_sendEventArgs.Dispose();
}
}
bool _disposeRead = false;
void CloseRead()
{
if (!_disposeRead && !_inReadPending)
{
lock (socketCloseLock)
closeReadCount++;
_disposeRead = true;
m_receiveEventArgs.SetBuffer(null, 0, 0);
m_receiveEventArgs.Completed -= ReceiveEventArgs_Completed;
m_receiveEventArgs.Dispose();
}
}
private void CloseClient()
{
try
{
CloseSend();
CloseRead();
ConnectSocket.Close();
}
catch (Exception ex)
{
NetLogger.Log("CloseClient", ex);
}
}
//發(fā)送緩沖大小
private List<byte[]> SplitData(byte[] data, int maxLen)
{
List<byte[]> items = new List<byte[]>();
int start = 0;
while (true)
{
int itemLen = Math.Min(maxLen, data.Length - start);
if (itemLen == 0)
break;
byte[] item = new byte[itemLen];
Array.Copy(data, start, item, 0, itemLen);
items.Add(item);
start += itemLen;
}
return items;
}
}
public enum EN_SocketReadResult
{
InAsyn,
HaveRead,
ReadError
}
public enum EN_SocketSendResult
{
InAsyn,
HaveSend,
NoSendData,
SendError
}
class SendBufferPool
{
ObjectPool<byte[]> _bufferPool = new ObjectPool<byte[]>();
public Int64 _bufferByteCount = 0;
public bool PutObj(byte[] obj)
{
if (_bufferPool.PutObj(obj))
{
lock (this)
{
_bufferByteCount += obj.Length;
}
return true;
}
else
{
return false;
}
}
public byte[] GetObj()
{
byte[] result = _bufferPool.GetObj();
if (result != null)
{
lock (this)
{
_bufferByteCount -= result.Length;
}
}
return result;
}
}
}
NetServer 聚合其他類
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.Sockets;
using System.Threading;
namespace IocpCore
{
public class NetServer
{
public Action<SocketEventParam> OnSocketPacketEvent;
//每個(gè)連接發(fā)送緩沖大小
public int SendBufferBytePerClient { get; set; } = 1024 * 100;
bool _serverStart = false;
List<NetListener> _listListener = new List<NetListener>();
//負(fù)責(zé)對收到的字節(jié)流 組成完成的包
ClientPacketManage _clientPacketManage;
public Int64 SendByteCount { get; set; }
public Int64 ReadByteCount { get; set; }
List<ListenParam> _listListenPort = new List<ListenParam>();
public void AddListenPort(int port, object tag)
{
_listListenPort.Add(new ListenParam(port, tag));
}
/// <summary>
///
/// </summary>
/// <param name="listenFault">監(jiān)聽失敗的端口</param>
/// <returns></returns>
public bool StartListen(out List<int> listenFault)
{
_serverStart = true;
_clientPacketManage = new ClientPacketManage(this);
_clientPacketManage.OnSocketPacketEvent += PutClientPacket;
_netConnectManage.OnSocketConnectEvent += SocketConnectEvent;
_listListener.Clear();
Thread thread1 = new Thread(new ThreadStart(NetPacketProcess));
thread1.Start();
Thread thread2 = new Thread(new ThreadStart(NetSendProcess));
thread2.Start();
Thread thread3 = new Thread(new ThreadStart(NetReadProcess));
thread3.Start();
listenFault = new List<int>();
foreach (ListenParam param in _listListenPort)
{
NetListener listener = new NetListener(this);
listener._listenParam = param;
listener.OnAcceptSocket += Listener_OnAcceptSocket;
if (!listener.StartListen())
{
listenFault.Add(param._port);
}
else
{
_listListener.Add(listener);
NetLogger.Log(string.Format("監(jiān)聽成功!端口:{0}", param._port));
}
}
return listenFault.Count == 0;
}
public void PutClientPacket(SocketEventParam param)
{
OnSocketPacketEvent?.Invoke(param);
}
//獲取包的最小長度
int _packetMinLen;
int _packetMaxLen;
public int PacketMinLen
{
get { return _packetMinLen; }
}
public int PacketMaxLen
{
get { return _packetMaxLen; }
}
/// <summary>
/// 設(shè)置包的最小和最大長度
/// 當(dāng)minLen=0時(shí),認(rèn)為是接收字節(jié)流
/// </summary>
/// <param name="minLen"></param>
/// <param name="maxLen"></param>
public void SetPacketParam(int minLen, int maxLen)
{
Debug.Assert(minLen >= 0);
Debug.Assert(maxLen > minLen);
_packetMinLen = minLen;
_packetMaxLen = maxLen;
}
//獲取包的總長度
public delegate int delegate_GetPacketTotalLen(byte[] data, int offset);
public delegate_GetPacketTotalLen GetPacketTotalLen_Callback;
ObjectPoolWithEvent<SocketEventParam> _socketEventPool = new ObjectPoolWithEvent<SocketEventParam>();
private void NetPacketProcess()
{
while (_serverStart)
{
try
{
DealEventPool();
}
catch (Exception ex)
{
NetLogger.Log(string.Format("DealEventPool 異常 {0}***{1}", ex.Message, ex.StackTrace));
}
_socketEventPool.WaitOne(1000);
}
}
Dictionary<Socket, AsyncSocketClient> _clientGroup = new Dictionary<Socket, AsyncSocketClient>();
public int ClientCount
{
get
{
lock (_clientGroup)
{
return _clientGroup.Count;
}
}
}
public List<Socket> ClientList
{
get
{
lock (_clientGroup)
{
return _clientGroup.Keys.ToList();
}
}
}
private void DealEventPool()
{
while (true)
{
SocketEventParam param = _socketEventPool.GetObj();
if (param == null)
return;
if (param.SocketEvent == EN_SocketEvent.close)
{
lock (_clientGroup)
{
_clientGroup.Remove(param.Socket);
}
}
if (_packetMinLen == 0)//字節(jié)流處理
{
OnSocketPacketEvent?.Invoke(param);
}
else
{
//組成一個(gè)完整的包 邏輯
_clientPacketManage.PutSocketParam(param);
}
}
}
private void SocketConnectEvent(SocketEventParam param, AsyncSocketClient client)
{
try
{
if (param.Socket == null || client == null) //連接失敗
{
}
else
{
lock (_clientGroup)
{
bool remove = _clientGroup.Remove(client.ConnectSocket);
Debug.Assert(!remove);
_clientGroup.Add(client.ConnectSocket, client);
}
client.OnSocketClose += Client_OnSocketClose;
client.OnReadData += Client_OnReadData;
client.OnSendData += Client_OnSendData;
_listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
}
_socketEventPool.PutObj(param);
}
catch (Exception ex)
{
NetLogger.Log(string.Format("SocketConnectEvent 異常 {0}***{1}", ex.Message, ex.StackTrace));
}
}
internal void OnRcvPacketLenError(Socket socket, byte[] buffer, int offset, int packetLen)
{
try
{
lock (_clientGroup)
{
if (!_clientGroup.ContainsKey(socket))
{
Debug.Assert(false);
return;
}
NetLogger.Log(string.Format("報(bào)長度異常!包長:{0}", packetLen));
AsyncSocketClient client = _clientGroup[socket];
client.CloseSocket();
}
}
catch (Exception ex)
{
NetLogger.Log(string.Format("OnRcvPacketLenError 異常 {0}***{1}", ex.Message, ex.StackTrace));
}
}
#region listen port
private void Listener_OnAcceptSocket(ListenParam listenPatam, AsyncSocketClient client)
{
try
{
lock (_clientGroup)
{
bool remove = _clientGroup.Remove(client.ConnectSocket);
Debug.Assert(!remove);
_clientGroup.Add(client.ConnectSocket, client);
}
client.OnSocketClose += Client_OnSocketClose;
client.OnReadData += Client_OnReadData;
client.OnSendData += Client_OnSendData;
_listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
SocketEventParam param = new SocketEventParam(EN_SocketEvent.accept, client.ConnectSocket);
param.ClientInfo = client.ClientInfo;
_socketEventPool.PutObj(param);
}
catch (Exception ex)
{
NetLogger.Log(string.Format("Listener_OnAcceptSocket 異常 {0}***{1}", ex.Message, ex.StackTrace));
}
}
ObjectPoolWithEvent<SocketEventDeal> _listSendEvent = new ObjectPoolWithEvent<SocketEventDeal>();
private void NetSendProcess()
{
while (true)
{
DealSendEvent();
_listSendEvent.WaitOne(1000);
}
}
ObjectPoolWithEvent<SocketEventDeal> _listReadEvent = new ObjectPoolWithEvent<SocketEventDeal>();
private void NetReadProcess()
{
while (true)
{
DealReadEvent();
_listReadEvent.WaitOne(1000);
}
}
private void DealSendEvent()
{
while (true)
{
SocketEventDeal item = _listSendEvent.GetObj();
if (item == null)
break;
switch (item.SocketEvent)
{
case EN_SocketDealEvent.send:
{
while (true)
{
EN_SocketSendResult result = item.Client.SendNextData();
if (result == EN_SocketSendResult.HaveSend)
continue;
else
break;
}
}
break;
case EN_SocketDealEvent.read:
{
Debug.Assert(false);
}
break;
}
}
}
private void DealReadEvent()
{
while (true)
{
SocketEventDeal item = _listReadEvent.GetObj();
if (item == null)
break;
switch (item.SocketEvent)
{
case EN_SocketDealEvent.read:
{
while (true)
{
EN_SocketReadResult result = item.Client.ReadNextData();
if (result == EN_SocketReadResult.HaveRead)
continue;
else
break;
}
}
break;
case EN_SocketDealEvent.send:
{
Debug.Assert(false);
}
break;
}
}
}
private void Client_OnReadData(AsyncSocketClient client, byte[] readData)
{
//讀下一條
_listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
try
{
SocketEventParam param = new SocketEventParam(EN_SocketEvent.read, client.ConnectSocket);
param.ClientInfo = client.ClientInfo;
param.Data = readData;
_socketEventPool.PutObj(param);
lock (this)
{
ReadByteCount += readData.Length;
}
}
catch (Exception ex)
{
NetLogger.Log(string.Format("Client_OnReadData 異常 {0}***{1}", ex.Message, ex.StackTrace));
}
}
#endregion
private void Client_OnSendData(AsyncSocketClient client, int sendCount)
{
//發(fā)送下一條
_listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));
lock (this)
{
SendByteCount += sendCount;
}
}
private void Client_OnSocketClose(AsyncSocketClient client)
{
try
{
SocketEventParam param = new SocketEventParam(EN_SocketEvent.close, client.ConnectSocket);
param.ClientInfo = client.ClientInfo;
_socketEventPool.PutObj(param);
}
catch (Exception ex)
{
NetLogger.Log(string.Format("Client_OnSocketClose 異常 {0}***{1}", ex.Message, ex.StackTrace));
}
}
/// <summary>
/// 放到發(fā)送緩沖
/// </summary>
/// <param name="socket"></param>
/// <param name="data"></param>
/// <returns></returns>
public EN_SendDataResult SendData(Socket socket, byte[] data)
{
if (socket == null)
return EN_SendDataResult.no_client;
lock (_clientGroup)
{
if (!_clientGroup.ContainsKey(socket))
return EN_SendDataResult.no_client;
AsyncSocketClient client = _clientGroup[socket];
EN_SendDataResult result = client.PutSendData(data);
if (result == EN_SendDataResult.ok)
{
//發(fā)送下一條
_listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));
}
return result;
}
}
/// <summary>
/// 設(shè)置某個(gè)連接的發(fā)送緩沖大小
/// </summary>
/// <param name="socket"></param>
/// <param name="byteCount"></param>
/// <returns></returns>
public bool SetClientSendBuffer(Socket socket, int byteCount)
{
lock (_clientGroup)
{
if (!_clientGroup.ContainsKey(socket))
return false;
AsyncSocketClient client = _clientGroup[socket];
client.SendBufferByteCount = byteCount;
return true;
}
}
#region connect process
NetConnectManage _netConnectManage = new NetConnectManage();
/// <summary>
/// 異步連接一個(gè)客戶端
/// </summary>
/// <param name="peerIp"></param>
/// <param name="peerPort"></param>
/// <param name="tag"></param>
/// <returns></returns>
public bool ConnectAsyn(string peerIp, int peerPort, object tag)
{
return _netConnectManage.ConnectAsyn(peerIp, peerPort, tag);
}
/// <summary>
/// 同步連接一個(gè)客戶端
/// </summary>
/// <param name="peerIp"></param>
/// <param name="peerPort"></param>
/// <param name="tag"></param>
/// <param name="socket"></param>
/// <returns></returns>
public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
{
return _netConnectManage.Connect(peerIp, peerPort, tag, out socket);
}
#endregion
}
enum EN_SocketDealEvent
{
read,
send,
}
class SocketEventDeal
{
public AsyncSocketClient Client { get; set; }
public EN_SocketDealEvent SocketEvent { get; set; }
public SocketEventDeal(AsyncSocketClient client, EN_SocketDealEvent socketEvent)
{
Client = client;
SocketEvent = socketEvent;
}
}
}
庫的使用
使用起來非常簡單,示例如下
using IocpCore;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
using System.Windows;
namespace WarningClient
{
public class SocketServer
{
public Action<SocketEventParam> OnSocketEvent;
public Int64 SendByteCount
{
get
{
if (_netServer == null)
return 0;
return _netServer.SendByteCount;
}
}
public Int64 ReadByteCount
{
get
{
if (_netServer == null)
return 0;
return _netServer.ReadByteCount;
}
}
NetServer _netServer;
EN_PacketType _packetType = EN_PacketType.byteStream;
public void SetPacktType(EN_PacketType packetType)
{
_packetType = packetType;
if (_netServer == null)
return;
if (packetType == EN_PacketType.byteStream)
{
_netServer.SetPacketParam(0, 1024);
}
else
{
_netServer.SetPacketParam(9, 1024);
}
}
public bool Init(List<int> listenPort)
{
NetLogger.OnLogEvent += NetLogger_OnLogEvent;
_netServer = new NetServer();
SetPacktType(_packetType);
_netServer.GetPacketTotalLen_Callback += GetPacketTotalLen;
_netServer.OnSocketPacketEvent += SocketPacketDeal;
foreach (int n in listenPort)
{
_netServer.AddListenPort(n, n);
}
List<int> listenFault;
bool start = _netServer.StartListen(out listenFault);
return start;
}
int GetPacketTotalLen(byte[] data, int offset)
{
if (MainWindow._packetType == EN_PacketType.znss)
return GetPacketZnss(data, offset);
else
return GetPacketAnzhiyuan(data, offset);
}
int GetPacketAnzhiyuan(byte[] data, int offset)
{
int n = data[offset + 5] + 6;
return n;
}
int GetPacketZnss(byte[] data, int offset)
{
int packetLen = (int)(data[4]) + 5;
return packetLen;
}
public bool ConnectAsyn(string peerIp, int peerPort, object tag)
{
return _netServer.ConnectAsyn(peerIp, peerPort, tag);
}
public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
{
return _netServer.Connect(peerIp, peerPort, tag, out socket);
}
private void NetLogger_OnLogEvent(string message)
{
AppLog.Log(message);
}
Dictionary<Socket, SocketEventParam> _clientGroup = new Dictionary<Socket, SocketEventParam>();
public int ClientCount
{
get
{
lock (_clientGroup)
{
return _clientGroup.Count;
}
}
}
public List<Socket> ClientList
{
get
{
if (_netServer != null)
return _netServer.ClientList;
return new List<Socket>();
}
}
void AddClient(SocketEventParam socketParam)
{
lock (_clientGroup)
{
_clientGroup.Remove(socketParam.Socket);
_clientGroup.Add(socketParam.Socket, socketParam);
}
}
void RemoveClient(SocketEventParam socketParam)
{
lock (_clientGroup)
{
_clientGroup.Remove(socketParam.Socket);
}
}
ObjectPool<SocketEventParam> _readDataPool = new ObjectPool<SocketEventParam>();
public ObjectPool<SocketEventParam> ReadDataPool
{
get
{
return _readDataPool;
}
}
private void SocketPacketDeal(SocketEventParam socketParam)
{
OnSocketEvent?.Invoke(socketParam);
if (socketParam.SocketEvent == EN_SocketEvent.read)
{
if (MainWindow._isShowReadPacket)
_readDataPool.PutObj(socketParam);
}
else if (socketParam.SocketEvent == EN_SocketEvent.accept)
{
AddClient(socketParam);
string peerIp = socketParam.ClientInfo.PeerIpPort;
AppLog.Log(string.Format("客戶端鏈接!本地端口:{0},對端:{1}",
socketParam.ClientInfo.LocalPort, peerIp));
}
else if (socketParam.SocketEvent == EN_SocketEvent.connect)
{
string peerIp = socketParam.ClientInfo.PeerIpPort;
if (socketParam.Socket != null)
{
AddClient(socketParam);
AppLog.Log(string.Format("連接對端成功!本地端口:{0},對端:{1}",
socketParam.ClientInfo.LocalPort, peerIp));
}
else
{
AppLog.Log(string.Format("連接對端失敗!本地端口:{0},對端:{1}",
socketParam.ClientInfo.LocalPort, peerIp));
}
}
else if (socketParam.SocketEvent == EN_SocketEvent.close)
{
MainWindow.MainWnd.OnSocketDisconnect(socketParam.Socket);
RemoveClient(socketParam);
string peerIp = socketParam.ClientInfo.PeerIpPort;
AppLog.Log(string.Format("客戶端斷開!本地端口:{0},對端:{1},",
socketParam.ClientInfo.LocalPort, peerIp));
}
}
public EN_SendDataResult SendData(Socket socket, byte[] data)
{
if(socket == null)
{
MessageBox.Show("還沒連接!");
return EN_SendDataResult.no_client;
}
return _netServer.SendData(socket, data);
}
internal void SendToAll(byte[] data)
{
lock (_clientGroup)
{
foreach (Socket socket in _clientGroup.Keys)
{
SendData(socket, data);
}
}
}
}
}
以上這篇C#中一個(gè)高性能異步socket封裝庫的實(shí)現(xiàn)思路分享就是小編分享給大家的全部內(nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
給c#添加SetTimeout和SetInterval函數(shù)
Javascript中的SetTimeout和SetInterval函數(shù)很方便,把他們移植到c#中來。2008-03-03
C#實(shí)現(xiàn)兩個(gè)exe程序之間通信詳解
這篇文章主要為大家詳細(xì)介紹了C#如何使用SendMessage實(shí)現(xiàn)兩個(gè)程序之間的通信功能,文中的示例代碼簡潔易懂,需要的小伙伴可以參考下2023-07-07
C# IQueryable及IEnumerable區(qū)別解析
這篇文章主要介紹了C# IQueryable及IEnumerable區(qū)別解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09
C#中幾個(gè)未知的Visual Studio編碼技巧分享
用了多年的Visual Studio,今天才發(fā)現(xiàn)這個(gè)編碼技巧,真是慚愧,分享出來,算是拋磚引玉吧,需要的朋友可以參考下2012-11-11

