国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 編程 > C# > 正文

C#中一個(gè)高性能異步socket封裝庫(kù)的實(shí)現(xiàn)思路分享

2019-10-29 21:07:56
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

前言

socket是軟件之間通訊最常用的一種方式。c#實(shí)現(xiàn)socket通訊有很多中方法,其中效率最高就是異步通訊。

異步通訊實(shí)際是利用windows完成端口(IOCP)來(lái)處理的,關(guān)于完成端口實(shí)現(xiàn)原理,大家可以參考網(wǎng)上文章。

我這里想強(qiáng)調(diào)的是采用完成端口機(jī)制的異步通訊是windows下效率最高的通訊方式,沒(méi)有之一!

 

異步通訊比同步通訊處理要難很多,代碼編寫(xiě)中會(huì)遇到許多“坑“。如果沒(méi)有經(jīng)驗(yàn),很難完成。

我搜集了大量資料,完成了對(duì)異步socket的封裝。此庫(kù)已用穩(wěn)定高效的運(yùn)行幾個(gè)月。

 

縱觀網(wǎng)上的資料,我還沒(méi)有遇到一個(gè)滿意的封裝庫(kù)。許多文章把數(shù)據(jù)收發(fā)和協(xié)議處理雜糅在一塊,代碼非常難懂,也無(wú)法擴(kuò)展。

在編寫(xiě)該庫(kù)時(shí),避免以上缺陷。將邏輯處理層次化,模塊化!同時(shí)實(shí)現(xiàn)了高可用性與高性能。

 

為了使大家對(duì)通訊效率有初步了解,先看測(cè)試圖。

C#,socket封裝庫(kù)

主機(jī)配置情況

C#,socket封裝庫(kù)

百兆帶寬基本占滿,cpu占用40%,我的電腦在空閑時(shí),cpu占用大概20%,也就是說(shuō)程序占用cpu 20%左右。

這個(gè)庫(kù)是可擴(kuò)展的,就是說(shuō)即使10萬(wàn)個(gè)連接,收發(fā)同樣的數(shù)據(jù),cpu占用基本相同。

 

庫(kù)的結(jié)構(gòu)圖

C#,socket封裝庫(kù)

目標(biāo)

即可作為服務(wù)端(監(jiān)聽(tīng))也可以作為客戶端(主動(dòng)連接)使用。

可以適應(yīng)任何網(wǎng)絡(luò)協(xié)議。收發(fā)的數(shù)據(jù)針對(duì)字節(jié)流或一個(gè)完整的包。對(duì)協(xié)議內(nèi)容不做處理。

高可用性。將復(fù)雜的底層處理封裝,對(duì)外接口非常友好。

高性能。最大限度優(yōu)化處理。單機(jī)可支持?jǐn)?shù)萬(wàn)連接,收發(fā)速度可達(dá)幾百兆bit。

實(shí)現(xiàn)思路

網(wǎng)絡(luò)處理邏輯可以分為以下幾個(gè)部分:

網(wǎng)絡(luò)監(jiān)聽(tīng) 可以在多個(gè)端口實(shí)現(xiàn)監(jiān)聽(tīng)。負(fù)責(zé)生成socket,生成的socket供后續(xù)處理。監(jiān)聽(tīng)模塊功能比較單一,如有必要,可對(duì)監(jiān)聽(tīng)模塊做進(jìn)一步優(yōu)化。

主動(dòng)連接 可以異步或同步的連接對(duì)方。連接成功后,對(duì)socket的后續(xù)處理,與監(jiān)聽(tīng)得到的socket完全一樣。注:無(wú)論是監(jiān)聽(tīng)得到的socket,還是連接得到的socket,后續(xù)處理完全一樣。

Socket收發(fā)處理 每個(gè)socket對(duì)應(yīng)一個(gè)收發(fā)實(shí)例,socket收發(fā)只針對(duì)字節(jié)流處理。收發(fā)時(shí),做了優(yōu)化。比如發(fā)送時(shí),對(duì)數(shù)據(jù)做了沾包,提高發(fā)送性能;接收時(shí),一次投遞1K的數(shù)據(jù)。

組包處理 一般數(shù)據(jù)包都有包長(zhǎng)度指示;比如 報(bào)頭的前倆個(gè)字節(jié)表示長(zhǎng)度,根據(jù)這個(gè)值就可以組成一個(gè)完整的包。

NetListener 監(jiān)聽(tīng)

using System;using System.Net;using System.Net.Sockets;using System.Threading; namespace IocpCore{ class NetListener {  private Socket listenSocket;  public ListenParam _listenParam { get; set; }  public event Action<ListenParam, AsyncSocketClient> OnAcceptSocket;   bool start;   NetServer _netServer;  public NetListener(NetServer netServer)  {   _netServer = netServer;  }   public int _acceptAsyncCount = 0;  public bool StartListen()  {   try   {    start = true;    IPEndPoint listenPoint = new IPEndPoint(IPAddress.Parse("0.0.0.0"), _listenParam._port);    listenSocket = new Socket(listenPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);    listenSocket.Bind(listenPoint);    listenSocket.Listen(200);     Thread thread1 = new Thread(new ThreadStart(NetProcess));    thread1.Start();        StartAccept();    return true;   }   catch (Exception ex)   {    NetLogger.Log(string.Format("**監(jiān)聽(tīng)異常!{0}", ex.Message));    return false;   }  }   AutoResetEvent _acceptEvent = new AutoResetEvent(false);  private void NetProcess()  {   while (start)   {    DealNewAccept();    _acceptEvent.WaitOne(1000 * 10);   }  }   private void DealNewAccept()  {   try   {    if(_acceptAsyncCount <= 10)    {     StartAccept();    }     while (true)    {     AsyncSocketClient client = _newSocketClientList.GetObj();     if (client == null)      break;      DealNewAccept(client);    }   }   catch (Exception ex)   {    NetLogger.Log(string.Format("DealNewAccept 異常 {0}***{1}", ex.Message, ex.StackTrace));   }  }   private void DealNewAccept(AsyncSocketClient client)  {   client.SendBufferByteCount = _netServer.SendBufferBytePerClient;   OnAcceptSocket?.Invoke(_listenParam, client);  }   private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs)  {   try   {    Interlocked.Decrement(ref _acceptAsyncCount);    _acceptEvent.Set();    acceptEventArgs.Completed -= AcceptEventArg_Completed;    ProcessAccept(acceptEventArgs);   }   catch (Exception ex)   {    NetLogger.Log(string.Format("AcceptEventArg_Completed {0}***{1}", ex.Message, ex.StackTrace));   }  }   public bool StartAccept()  {   SocketAsyncEventArgs acceptEventArgs = new SocketAsyncEventArgs();   acceptEventArgs.Completed += AcceptEventArg_Completed;    bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs);   Interlocked.Increment(ref _acceptAsyncCount);    if (!willRaiseEvent)   {    Interlocked.Decrement(ref _acceptAsyncCount);    _acceptEvent.Set();    acceptEventArgs.Completed -= AcceptEventArg_Completed;    ProcessAccept(acceptEventArgs);   }   return true;  }   ObjectPool<AsyncSocketClient> _newSocketClientList = new ObjectPool<AsyncSocketClient>();  private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)  {   try   {    using (acceptEventArgs)    {     if (acceptEventArgs.AcceptSocket != null)     {      AsyncSocketClient client = new AsyncSocketClient(acceptEventArgs.AcceptSocket);      client.CreateClientInfo(this);       _newSocketClientList.PutObj(client);      _acceptEvent.Set();     }    }   }   catch (Exception ex)   {    NetLogger.Log(string.Format("ProcessAccept {0}***{1}", ex.Message, ex.StackTrace));   }  } }}

NetConnectManage連接處理

using System;using System.Net;using System.Net.Sockets;namespace IocpCore{ class NetConnectManage {  public event Action<SocketEventParam, AsyncSocketClient> OnSocketConnectEvent;  public bool ConnectAsyn(string peerIp, int peerPort, object tag)  {   try   {    Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp);    SocketAsyncEventArgs socketEventArgs = new SocketAsyncEventArgs();    socketEventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);    socketEventArgs.Completed += SocketConnect_Completed;    SocketClientInfo clientInfo = new SocketClientInfo();    socketEventArgs.UserToken = clientInfo;    clientInfo.PeerIp = peerIp;    clientInfo.PeerPort = peerPort;    clientInfo.Tag = tag;    bool willRaiseEvent = socket.ConnectAsync(socketEventArgs);    if (!willRaiseEvent)    {     ProcessConnect(socketEventArgs);     socketEventArgs.Completed -= SocketConnect_Completed;     socketEventArgs.Dispose();    }    return true;   }   catch (Exception ex)   {    NetLogger.Log("ConnectAsyn",ex);    return false;   }  }  private void SocketConnect_Completed(object sender, SocketAsyncEventArgs socketEventArgs)  {   ProcessConnect(socketEventArgs);   socketEventArgs.Completed -= SocketConnect_Completed;   socketEventArgs.Dispose();  }  private void ProcessConnect(SocketAsyncEventArgs socketEventArgs)  {   SocketClientInfo clientInfo = socketEventArgs.UserToken as SocketClientInfo;   if (socketEventArgs.SocketError == SocketError.Success)   {    DealConnectSocket(socketEventArgs.ConnectSocket, clientInfo);   }   else   {    SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, null);    socketParam.ClientInfo = clientInfo;    OnSocketConnectEvent?.Invoke(socketParam, null);   }  }  void DealConnectSocket(Socket socket, SocketClientInfo clientInfo)  {   clientInfo.SetClientInfo(socket);   AsyncSocketClient client = new AsyncSocketClient(socket);   client.SetClientInfo(clientInfo);   //觸發(fā)事件   SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, socket);   socketParam.ClientInfo = clientInfo;   OnSocketConnectEvent?.Invoke(socketParam, client);  }  public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)  {   socket = null;   try   {    Socket socketTmp = new Socket(SocketType.Stream, ProtocolType.Tcp);    SocketClientInfo clientInfo = new SocketClientInfo();    clientInfo.PeerIp = peerIp;    clientInfo.PeerPort = peerPort;    clientInfo.Tag = tag;    EndPoint remoteEP = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);    socketTmp.Connect(remoteEP);    if (!socketTmp.Connected)     return false;    DealConnectSocket(socketTmp, clientInfo);    socket = socketTmp;    return true;   }   catch (Exception ex)   {    NetLogger.Log(string.Format("連接對(duì)方:({0}:{1})出錯(cuò)!", peerIp, peerPort), ex);    return false;   }  } }}

AsyncSocketClient socket收發(fā)處理

using System;using System.Collections.Generic;using System.Diagnostics;using System.Net;using System.Net.Sockets;namespace IocpCore{ public class AsyncSocketClient {  public static int IocpReadLen = 1024;  public readonly Socket ConnectSocket;  protected SocketAsyncEventArgs m_receiveEventArgs;  public SocketAsyncEventArgs ReceiveEventArgs { get { return m_receiveEventArgs; } set { m_receiveEventArgs = value; } }  protected byte[] m_asyncReceiveBuffer;  protected SocketAsyncEventArgs m_sendEventArgs;  public SocketAsyncEventArgs SendEventArgs { get { return m_sendEventArgs; } set { m_sendEventArgs = value; } }  protected byte[] m_asyncSendBuffer;  public event Action<AsyncSocketClient, byte[]> OnReadData;  public event Action<AsyncSocketClient, int> OnSendData;  public event Action<AsyncSocketClient> OnSocketClose;  static object releaseLock = new object();  public static int createCount = 0;  public static int releaseCount = 0;  ~AsyncSocketClient()  {   lock (releaseLock)   {    releaseCount++;   }  }  public AsyncSocketClient(Socket socket)  {   lock (releaseLock)   {    createCount++;   }   ConnectSocket = socket;   m_receiveEventArgs = new SocketAsyncEventArgs();   m_asyncReceiveBuffer = new byte[IocpReadLen];   m_receiveEventArgs.AcceptSocket = ConnectSocket;   m_receiveEventArgs.Completed += ReceiveEventArgs_Completed;   m_sendEventArgs = new SocketAsyncEventArgs();   m_asyncSendBuffer = new byte[IocpReadLen * 2];   m_sendEventArgs.AcceptSocket = ConnectSocket;   m_sendEventArgs.Completed += SendEventArgs_Completed;  }  SocketClientInfo _clientInfo;  public SocketClientInfo ClientInfo  {   get   {    return _clientInfo;   }  }  internal void CreateClientInfo(NetListener netListener)  {   _clientInfo = new SocketClientInfo();   try   {    _clientInfo.Tag = netListener._listenParam._tag;    IPEndPoint ip = ConnectSocket.LocalEndPoint as IPEndPoint;    Debug.Assert(netListener._listenParam._port == ip.Port);    _clientInfo.LocalIp = ip.Address.ToString();    _clientInfo.LocalPort = netListener._listenParam._port;    ip = ConnectSocket.RemoteEndPoint as IPEndPoint;    _clientInfo.PeerIp = ip.Address.ToString();    _clientInfo.PeerPort = ip.Port;   }   catch (Exception ex)   {    NetLogger.Log("CreateClientInfo", ex);   }  }  internal void SetClientInfo(SocketClientInfo clientInfo)  {   _clientInfo = clientInfo;  }  #region read process  bool _inReadPending = false;  public EN_SocketReadResult ReadNextData()  {   lock (this)   {    if (_socketError)     return EN_SocketReadResult.ReadError;    if (_inReadPending)     return EN_SocketReadResult.InAsyn;    if(!ConnectSocket.Connected)    {     OnReadError();     return EN_SocketReadResult.ReadError;    }    try    {     m_receiveEventArgs.SetBuffer(m_asyncReceiveBuffer, 0, m_asyncReceiveBuffer.Length);     _inReadPending = true;     bool willRaiseEvent = ConnectSocket.ReceiveAsync(ReceiveEventArgs); //投遞接收請(qǐng)求     if (!willRaiseEvent)     {      _inReadPending = false;      ProcessReceive();      if (_socketError)      {       OnReadError();       return EN_SocketReadResult.ReadError;      }      return EN_SocketReadResult.HaveRead;     }     else     {      return EN_SocketReadResult.InAsyn;     }    }    catch (Exception ex)    {     NetLogger.Log("ReadNextData", ex);     _inReadPending = false;     OnReadError();     return EN_SocketReadResult.ReadError;    }   }  }  private void ProcessReceive()  {   if (ReceiveEventArgs.BytesTransferred > 0    && ReceiveEventArgs.SocketError == SocketError.Success)   {    int offset = ReceiveEventArgs.Offset;    int count = ReceiveEventArgs.BytesTransferred;    byte[] readData = new byte[count];    Array.Copy(m_asyncReceiveBuffer, offset, readData, 0, count);    _inReadPending = false;    if (!_socketError)     OnReadData?.Invoke(this, readData);   }   else   {    _inReadPending = false;    OnReadError();   }  }  private void ReceiveEventArgs_Completed(object sender, SocketAsyncEventArgs e)  {   lock (this)   {    _inReadPending = false;    ProcessReceive();    if (_socketError)    {     OnReadError();    }   }  }  bool _socketError = false;  private void OnReadError()  {   lock (this)   {    if (_socketError == false)    {     _socketError = true;     OnSocketClose?.Invoke(this);    }    CloseClient();   }  }  #endregion  #region send process  int _sendBufferByteCount = 102400;  public int SendBufferByteCount  {   get   {    return _sendBufferByteCount;   }   set   {    if (value < 1024)    {     _sendBufferByteCount = 1024;    }    else    {     _sendBufferByteCount = value;    }   }  }  SendBufferPool _sendDataPool = new SendBufferPool();  internal EN_SendDataResult PutSendData(byte[] data)  {   if (_socketError)    return EN_SendDataResult.no_client;   if (_sendDataPool._bufferByteCount >= _sendBufferByteCount)   {    return EN_SendDataResult.buffer_overflow;   }   if (data.Length <= IocpReadLen)   {    _sendDataPool.PutObj(data);   }   else   {    List<byte[]> dataItems = SplitData(data, IocpReadLen);    foreach (byte[] item in dataItems)    {     _sendDataPool.PutObj(item);    }   }   return EN_SendDataResult.ok;  }  bool _inSendPending = false;  public EN_SocketSendResult SendNextData()  {   lock (this)   {    if (_socketError)    {     return EN_SocketSendResult.SendError;    }    if (_inSendPending)    {     return EN_SocketSendResult.InAsyn;    }    int sendByteCount = GetSendData();    if (sendByteCount == 0)    {     return EN_SocketSendResult.NoSendData;    }    //防止拋出異常,否則影響性能    if (!ConnectSocket.Connected)    {     OnSendError();     return EN_SocketSendResult.SendError;    }    try    {     m_sendEventArgs.SetBuffer(m_asyncSendBuffer, 0, sendByteCount);     _inSendPending = true;     bool willRaiseEvent = ConnectSocket.SendAsync(m_sendEventArgs);     if (!willRaiseEvent)     {      _inSendPending = false;      ProcessSend(m_sendEventArgs);      if (_socketError)      {       OnSendError();       return EN_SocketSendResult.SendError;      }      else      {       OnSendData?.Invoke(this, sendByteCount);       //繼續(xù)發(fā)下一條       return EN_SocketSendResult.HaveSend;      }     }     else     {      return EN_SocketSendResult.InAsyn;     }    }    catch (Exception ex)    {     NetLogger.Log("SendNextData", ex);     _inSendPending = false;     OnSendError();     return EN_SocketSendResult.SendError;    }   }  }  private void SendEventArgs_Completed(object sender, SocketAsyncEventArgs sendEventArgs)  {   lock (this)   {    try    {     _inSendPending = false;     ProcessSend(m_sendEventArgs);     int sendCount = 0;     if (sendEventArgs.SocketError == SocketError.Success)     {      sendCount = sendEventArgs.BytesTransferred;     }     OnSendData?.Invoke(this, sendCount);     if (_socketError)     {      OnSendError();     }    }    catch (Exception ex)    {     NetLogger.Log("SendEventArgs_Completed", ex);    }   }  }  private bool ProcessSend(SocketAsyncEventArgs sendEventArgs)  {   if (sendEventArgs.SocketError == SocketError.Success)   {    return true;   }   else   {    OnSendError();    return false;   }  }  private int GetSendData()  {   int dataLen = 0;   while (true)   {    byte[] data = _sendDataPool.GetObj();    if (data == null)     return dataLen;    Array.Copy(data, 0, m_asyncSendBuffer, dataLen, data.Length);    dataLen += data.Length;    if (dataLen > IocpReadLen)     break;   }   return dataLen;  }  private void OnSendError()  {   lock (this)   {    if (_socketError == false)    {     _socketError = true;     OnSocketClose?.Invoke(this);    }    CloseClient();   }  }  #endregion  internal void CloseSocket()  {   try   {    ConnectSocket.Close();   }   catch (Exception ex)   {    NetLogger.Log("CloseSocket", ex);   }  }  static object socketCloseLock = new object();  public static int closeSendCount = 0;  public static int closeReadCount = 0;  bool _disposeSend = false;  void CloseSend()  {   if (!_disposeSend && !_inSendPending)   {    lock (socketCloseLock)     closeSendCount++;    _disposeSend = true;    m_sendEventArgs.SetBuffer(null, 0, 0);    m_sendEventArgs.Completed -= SendEventArgs_Completed;    m_sendEventArgs.Dispose();   }  }  bool _disposeRead = false;  void CloseRead()  {   if (!_disposeRead && !_inReadPending)   {    lock (socketCloseLock)     closeReadCount++;    _disposeRead = true;    m_receiveEventArgs.SetBuffer(null, 0, 0);    m_receiveEventArgs.Completed -= ReceiveEventArgs_Completed;    m_receiveEventArgs.Dispose();   }  }  private void CloseClient()  {   try   {    CloseSend();    CloseRead();    ConnectSocket.Close();   }   catch (Exception ex)   {    NetLogger.Log("CloseClient", ex);   }  }  //發(fā)送緩沖大小  private List<byte[]> SplitData(byte[] data, int maxLen)  {   List<byte[]> items = new List<byte[]>();   int start = 0;   while (true)   {    int itemLen = Math.Min(maxLen, data.Length - start);    if (itemLen == 0)     break;    byte[] item = new byte[itemLen];    Array.Copy(data, start, item, 0, itemLen);    items.Add(item);    start += itemLen;   }   return items;  } } public enum EN_SocketReadResult {  InAsyn,  HaveRead,  ReadError } public enum EN_SocketSendResult {  InAsyn,  HaveSend,  NoSendData,  SendError } class SendBufferPool {  ObjectPool<byte[]> _bufferPool = new ObjectPool<byte[]>();  public Int64 _bufferByteCount = 0;  public bool PutObj(byte[] obj)  {   if (_bufferPool.PutObj(obj))   {    lock (this)    {     _bufferByteCount += obj.Length;    }    return true;   }   else   {    return false;   }  }  public byte[] GetObj()  {   byte[] result = _bufferPool.GetObj();   if (result != null)   {    lock (this)    {     _bufferByteCount -= result.Length;    }   }   return result;  } }}

NetServer 聚合其他類(lèi)

using System;using System.Collections.Generic;using System.Diagnostics;using System.Linq;using System.Net.Sockets;using System.Threading;namespace IocpCore{ public class NetServer {  public Action<SocketEventParam> OnSocketPacketEvent;  //每個(gè)連接發(fā)送緩沖大小  public int SendBufferBytePerClient { get; set; } = 1024 * 100;  bool _serverStart = false;  List<NetListener> _listListener = new List<NetListener>();  //負(fù)責(zé)對(duì)收到的字節(jié)流 組成完成的包  ClientPacketManage _clientPacketManage;  public Int64 SendByteCount { get; set; }  public Int64 ReadByteCount { get; set; }  List<ListenParam> _listListenPort = new List<ListenParam>();  public void AddListenPort(int port, object tag)  {   _listListenPort.Add(new ListenParam(port, tag));  }  /// <summary>  ///   /// </summary>  /// <param name="listenFault">監(jiān)聽(tīng)失敗的端口</param>  /// <returns></returns>  public bool StartListen(out List<int> listenFault)  {   _serverStart = true;   _clientPacketManage = new ClientPacketManage(this);   _clientPacketManage.OnSocketPacketEvent += PutClientPacket;   _netConnectManage.OnSocketConnectEvent += SocketConnectEvent;   _listListener.Clear();   Thread thread1 = new Thread(new ThreadStart(NetPacketProcess));   thread1.Start();   Thread thread2 = new Thread(new ThreadStart(NetSendProcess));   thread2.Start();   Thread thread3 = new Thread(new ThreadStart(NetReadProcess));   thread3.Start();   listenFault = new List<int>();   foreach (ListenParam param in _listListenPort)   {    NetListener listener = new NetListener(this);    listener._listenParam = param;    listener.OnAcceptSocket += Listener_OnAcceptSocket;    if (!listener.StartListen())    {     listenFault.Add(param._port);    }    else    {     _listListener.Add(listener);     NetLogger.Log(string.Format("監(jiān)聽(tīng)成功!端口:{0}", param._port));    }   }   return listenFault.Count == 0;  }  public void PutClientPacket(SocketEventParam param)  {   OnSocketPacketEvent?.Invoke(param);  }  //獲取包的最小長(zhǎng)度  int _packetMinLen;  int _packetMaxLen;  public int PacketMinLen  {   get { return _packetMinLen; }  }  public int PacketMaxLen  {   get { return _packetMaxLen; }  }  /// <summary>  /// 設(shè)置包的最小和最大長(zhǎng)度  /// 當(dāng)minLen=0時(shí),認(rèn)為是接收字節(jié)流  /// </summary>  /// <param name="minLen"></param>  /// <param name="maxLen"></param>  public void SetPacketParam(int minLen, int maxLen)  {   Debug.Assert(minLen >= 0);   Debug.Assert(maxLen > minLen);   _packetMinLen = minLen;   _packetMaxLen = maxLen;  }  //獲取包的總長(zhǎng)度  public delegate int delegate_GetPacketTotalLen(byte[] data, int offset);  public delegate_GetPacketTotalLen GetPacketTotalLen_Callback;  ObjectPoolWithEvent<SocketEventParam> _socketEventPool = new ObjectPoolWithEvent<SocketEventParam>();  private void NetPacketProcess()  {   while (_serverStart)   {    try    {     DealEventPool();    }    catch (Exception ex)    {     NetLogger.Log(string.Format("DealEventPool 異常 {0}***{1}", ex.Message, ex.StackTrace));    }    _socketEventPool.WaitOne(1000);   }  }  Dictionary<Socket, AsyncSocketClient> _clientGroup = new Dictionary<Socket, AsyncSocketClient>();  public int ClientCount  {   get   {    lock (_clientGroup)    {     return _clientGroup.Count;    }   }  }  public List<Socket> ClientList  {   get   {    lock (_clientGroup)    {     return _clientGroup.Keys.ToList();    }   }  }  private void DealEventPool()  {   while (true)   {    SocketEventParam param = _socketEventPool.GetObj();    if (param == null)     return;    if (param.SocketEvent == EN_SocketEvent.close)    {     lock (_clientGroup)     {      _clientGroup.Remove(param.Socket);     }    }    if (_packetMinLen == 0)//字節(jié)流處理    {     OnSocketPacketEvent?.Invoke(param);    }    else    {     //組成一個(gè)完整的包 邏輯     _clientPacketManage.PutSocketParam(param);    }   }  }  private void SocketConnectEvent(SocketEventParam param, AsyncSocketClient client)  {   try   {    if (param.Socket == null || client == null) //連接失敗    {         }    else    {     lock (_clientGroup)     {      bool remove = _clientGroup.Remove(client.ConnectSocket);      Debug.Assert(!remove);      _clientGroup.Add(client.ConnectSocket, client);     }     client.OnSocketClose += Client_OnSocketClose;     client.OnReadData += Client_OnReadData;     client.OnSendData += Client_OnSendData;     _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));    }    _socketEventPool.PutObj(param);   }   catch (Exception ex)   {    NetLogger.Log(string.Format("SocketConnectEvent 異常 {0}***{1}", ex.Message, ex.StackTrace));   }  }  internal void OnRcvPacketLenError(Socket socket, byte[] buffer, int offset, int packetLen)  {   try   {    lock (_clientGroup)    {     if (!_clientGroup.ContainsKey(socket))     {      Debug.Assert(false);      return;     }     NetLogger.Log(string.Format("報(bào)長(zhǎng)度異常!包長(zhǎng):{0}", packetLen));     AsyncSocketClient client = _clientGroup[socket];     client.CloseSocket();    }   }   catch (Exception ex)   {    NetLogger.Log(string.Format("OnRcvPacketLenError 異常 {0}***{1}", ex.Message, ex.StackTrace));   }  }  #region listen port  private void Listener_OnAcceptSocket(ListenParam listenPatam, AsyncSocketClient client)  {   try   {    lock (_clientGroup)    {     bool remove = _clientGroup.Remove(client.ConnectSocket);     Debug.Assert(!remove);     _clientGroup.Add(client.ConnectSocket, client);    }    client.OnSocketClose += Client_OnSocketClose;    client.OnReadData += Client_OnReadData;    client.OnSendData += Client_OnSendData;    _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));    SocketEventParam param = new SocketEventParam(EN_SocketEvent.accept, client.ConnectSocket);    param.ClientInfo = client.ClientInfo;    _socketEventPool.PutObj(param);   }   catch (Exception ex)   {    NetLogger.Log(string.Format("Listener_OnAcceptSocket 異常 {0}***{1}", ex.Message, ex.StackTrace));   }  }  ObjectPoolWithEvent<SocketEventDeal> _listSendEvent = new ObjectPoolWithEvent<SocketEventDeal>();  private void NetSendProcess()  {   while (true)   {    DealSendEvent();    _listSendEvent.WaitOne(1000);   }  }  ObjectPoolWithEvent<SocketEventDeal> _listReadEvent = new ObjectPoolWithEvent<SocketEventDeal>();  private void NetReadProcess()  {   while (true)   {    DealReadEvent();    _listReadEvent.WaitOne(1000);   }  }    private void DealSendEvent()  {   while (true)   {    SocketEventDeal item = _listSendEvent.GetObj();    if (item == null)     break;    switch (item.SocketEvent)    {     case EN_SocketDealEvent.send:      {       while (true)       {        EN_SocketSendResult result = item.Client.SendNextData();        if (result == EN_SocketSendResult.HaveSend)         continue;        else         break;       }      }      break;     case EN_SocketDealEvent.read:      {       Debug.Assert(false);      }      break;         }   }  }  private void DealReadEvent()  {   while (true)   {    SocketEventDeal item = _listReadEvent.GetObj();    if (item == null)     break;    switch (item.SocketEvent)    {     case EN_SocketDealEvent.read:      {       while (true)       {        EN_SocketReadResult result = item.Client.ReadNextData();        if (result == EN_SocketReadResult.HaveRead)         continue;        else         break;       }      }      break;     case EN_SocketDealEvent.send:      {       Debug.Assert(false);      }      break;    }   }  }  private void Client_OnReadData(AsyncSocketClient client, byte[] readData)  {   //讀下一條   _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));   try   {    SocketEventParam param = new SocketEventParam(EN_SocketEvent.read, client.ConnectSocket);    param.ClientInfo = client.ClientInfo;    param.Data = readData;    _socketEventPool.PutObj(param);    lock (this)    {     ReadByteCount += readData.Length;    }   }   catch (Exception ex)   {    NetLogger.Log(string.Format("Client_OnReadData 異常 {0}***{1}", ex.Message, ex.StackTrace));   }  }#endregion  private void Client_OnSendData(AsyncSocketClient client, int sendCount)  {   //發(fā)送下一條   _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));   lock (this)   {    SendByteCount += sendCount;   }  }  private void Client_OnSocketClose(AsyncSocketClient client)  {   try   {    SocketEventParam param = new SocketEventParam(EN_SocketEvent.close, client.ConnectSocket);    param.ClientInfo = client.ClientInfo;    _socketEventPool.PutObj(param);   }   catch (Exception ex)   {    NetLogger.Log(string.Format("Client_OnSocketClose 異常 {0}***{1}", ex.Message, ex.StackTrace));   }  }  /// <summary>  /// 放到發(fā)送緩沖  /// </summary>  /// <param name="socket"></param>  /// <param name="data"></param>  /// <returns></returns>  public EN_SendDataResult SendData(Socket socket, byte[] data)  {   if (socket == null)    return EN_SendDataResult.no_client;   lock (_clientGroup)   {    if (!_clientGroup.ContainsKey(socket))     return EN_SendDataResult.no_client;    AsyncSocketClient client = _clientGroup[socket];    EN_SendDataResult result = client.PutSendData(data);    if (result == EN_SendDataResult.ok)    {     //發(fā)送下一條     _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));         }    return result;   }  }  /// <summary>  /// 設(shè)置某個(gè)連接的發(fā)送緩沖大小  /// </summary>  /// <param name="socket"></param>  /// <param name="byteCount"></param>  /// <returns></returns>  public bool SetClientSendBuffer(Socket socket, int byteCount)  {   lock (_clientGroup)   {    if (!_clientGroup.ContainsKey(socket))     return false;    AsyncSocketClient client = _clientGroup[socket];    client.SendBufferByteCount = byteCount;    return true;   }  }  #region connect process  NetConnectManage _netConnectManage = new NetConnectManage();  /// <summary>  /// 異步連接一個(gè)客戶端  /// </summary>  /// <param name="peerIp"></param>  /// <param name="peerPort"></param>  /// <param name="tag"></param>  /// <returns></returns>  public bool ConnectAsyn(string peerIp, int peerPort, object tag)  {   return _netConnectManage.ConnectAsyn(peerIp, peerPort, tag);  }  /// <summary>  /// 同步連接一個(gè)客戶端  /// </summary>  /// <param name="peerIp"></param>  /// <param name="peerPort"></param>  /// <param name="tag"></param>  /// <param name="socket"></param>  /// <returns></returns>  public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)  {   return _netConnectManage.Connect(peerIp, peerPort, tag, out socket);  }  #endregion } enum EN_SocketDealEvent {  read,  send, } class SocketEventDeal {  public AsyncSocketClient Client { get; set; }  public EN_SocketDealEvent SocketEvent { get; set; }  public SocketEventDeal(AsyncSocketClient client, EN_SocketDealEvent socketEvent)  {   Client = client;   SocketEvent = socketEvent;  } }}

庫(kù)的使用

使用起來(lái)非常簡(jiǎn)單,示例如下

using IocpCore;using System;using System.Collections.Generic;using System.Linq;using System.Net.Sockets;using System.Text;using System.Threading.Tasks;using System.Windows;namespace WarningClient{ public class SocketServer {  public Action<SocketEventParam> OnSocketEvent;  public Int64 SendByteCount  {   get   {    if (_netServer == null)     return 0;    return _netServer.SendByteCount;   }  }  public Int64 ReadByteCount  {   get   {    if (_netServer == null)     return 0;    return _netServer.ReadByteCount;   }  }  NetServer _netServer;  EN_PacketType _packetType = EN_PacketType.byteStream;  public void SetPacktType(EN_PacketType packetType)  {   _packetType = packetType;   if (_netServer == null)    return;   if (packetType == EN_PacketType.byteStream)   {    _netServer.SetPacketParam(0, 1024);   }   else   {    _netServer.SetPacketParam(9, 1024);   }  }  public bool Init(List<int> listenPort)  {   NetLogger.OnLogEvent += NetLogger_OnLogEvent;   _netServer = new NetServer();   SetPacktType(_packetType);   _netServer.GetPacketTotalLen_Callback += GetPacketTotalLen;   _netServer.OnSocketPacketEvent += SocketPacketDeal;   foreach (int n in listenPort)   {    _netServer.AddListenPort(n, n);   }   List<int> listenFault;   bool start = _netServer.StartListen(out listenFault);   return start;  }  int GetPacketTotalLen(byte[] data, int offset)  {   if (MainWindow._packetType == EN_PacketType.znss)    return GetPacketZnss(data, offset);   else    return GetPacketAnzhiyuan(data, offset);  }  int GetPacketAnzhiyuan(byte[] data, int offset)  {   int n = data[offset + 5] + 6;   return n;  }  int GetPacketZnss(byte[] data, int offset)  {   int packetLen = (int)(data[4]) + 5;   return packetLen;  }  public bool ConnectAsyn(string peerIp, int peerPort, object tag)  {   return _netServer.ConnectAsyn(peerIp, peerPort, tag);  }  public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)  {   return _netServer.Connect(peerIp, peerPort, tag, out socket);  }  private void NetLogger_OnLogEvent(string message)  {   AppLog.Log(message);  }  Dictionary<Socket, SocketEventParam> _clientGroup = new Dictionary<Socket, SocketEventParam>();  public int ClientCount  {   get   {    lock (_clientGroup)    {     return _clientGroup.Count;    }   }  }  public List<Socket> ClientList  {   get   {    if (_netServer != null)     return _netServer.ClientList;    return new List<Socket>();   }  }  void AddClient(SocketEventParam socketParam)  {   lock (_clientGroup)   {    _clientGroup.Remove(socketParam.Socket);    _clientGroup.Add(socketParam.Socket, socketParam);   }  }  void RemoveClient(SocketEventParam socketParam)  {   lock (_clientGroup)   {    _clientGroup.Remove(socketParam.Socket);   }  }  ObjectPool<SocketEventParam> _readDataPool = new ObjectPool<SocketEventParam>();  public ObjectPool<SocketEventParam> ReadDataPool  {   get   {    return _readDataPool;   }  }  private void SocketPacketDeal(SocketEventParam socketParam)  {   OnSocketEvent?.Invoke(socketParam);   if (socketParam.SocketEvent == EN_SocketEvent.read)   {    if (MainWindow._isShowReadPacket)     _readDataPool.PutObj(socketParam);   }   else if (socketParam.SocketEvent == EN_SocketEvent.accept)   {    AddClient(socketParam);    string peerIp = socketParam.ClientInfo.PeerIpPort;    AppLog.Log(string.Format("客戶端鏈接!本地端口:{0},對(duì)端:{1}",     socketParam.ClientInfo.LocalPort, peerIp));   }   else if (socketParam.SocketEvent == EN_SocketEvent.connect)   {    string peerIp = socketParam.ClientInfo.PeerIpPort;    if (socketParam.Socket != null)    {     AddClient(socketParam);     AppLog.Log(string.Format("連接對(duì)端成功!本地端口:{0},對(duì)端:{1}",      socketParam.ClientInfo.LocalPort, peerIp));    }    else    {     AppLog.Log(string.Format("連接對(duì)端失敗!本地端口:{0},對(duì)端:{1}",      socketParam.ClientInfo.LocalPort, peerIp));    }   }   else if (socketParam.SocketEvent == EN_SocketEvent.close)   {    MainWindow.MainWnd.OnSocketDisconnect(socketParam.Socket);    RemoveClient(socketParam);    string peerIp = socketParam.ClientInfo.PeerIpPort;    AppLog.Log(string.Format("客戶端斷開(kāi)!本地端口:{0},對(duì)端:{1},",     socketParam.ClientInfo.LocalPort, peerIp));   }  }  public EN_SendDataResult SendData(Socket socket, byte[] data)  {   if(socket == null)   {    MessageBox.Show("還沒(méi)連接!");    return EN_SendDataResult.no_client;   }   return _netServer.SendData(socket, data);  }  internal void SendToAll(byte[] data)  {   lock (_clientGroup)   {    foreach (Socket socket in _clientGroup.Keys)    {     SendData(socket, data);    }   }  } }}

以上這篇C#中一個(gè)高性能異步socket封裝庫(kù)的實(shí)現(xiàn)思路分享就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持VEVB武林網(wǎng)。


注:相關(guān)教程知識(shí)閱讀請(qǐng)移步到c#教程頻道。
發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 洛扎县| 天气| 百色市| 南通市| 阿城市| 长海县| 乌鲁木齐县| 辉南县| 黔西| 宁国市| 凤庆县| 宽甸| 神池县| 包头市| 凌源市| 陵川县| 育儿| 岗巴县| 南乐县| 林口县| 喀喇| 上饶县| 叶城县| 武威市| 确山县| 抚顺市| 驻马店市| 望谟县| 万全县| 松阳县| 得荣县| 万荣县| 嘉祥县| 汕头市| 日土县| 濮阳市| 林西县| 方城县| 阿克陶县| 布拖县| 城步|