之前的文章講述了socket通信的一些基本知識,已經(jīng)本人自定義的C#版本的socket、和java netty 庫的二次封裝,但是沒有真正的發(fā)表測試用例。
本文只是為了講解利用PRotobuf 進(jìn)行C# 和 java的通信。以及完整的實例代碼
java 代碼 svn 地址,本人開發(fā)工具是NetBeans 8.0.2 使用 maven 項目編譯
http://code.taobao.org/svn/flynetwork_csharp/trunk/BlogTest
c# 代碼 svn 地址 使用的是 vs 2013 .net 4.5
http://code.taobao.org/svn/flynetwork_csharp/trunk/Flynetwork/BlogTest
編譯工具下載
http://files.VEVb.com/files/ty408/Sz.ExcelTools.zip
本文著重以C# socket作為服務(wù)器端,java netty作為socket的客戶端進(jìn)行訪問通信
首先附上proto的message文件
package Sz.Test.ProtoMessage;//登陸消息message TestMessage { //消息枚舉 enum Proto_Login { ResTip = 101201;//服務(wù)器推送提示 ReqLogin = 101102;//客戶端申請登陸 ReqChat = 101103;//客戶端申請聊天消息 ResChat = 101203;//服務(wù)器推送聊天消息 } //服務(wù)器推送提示 ResTip message ResTipMessage { required string msg = 1;//提示內(nèi)容 } //客戶端申請登陸 ReqLogin message ReqLoginMessage { required string userName = 1;//登陸用戶名 required string userPwd = 2;//登陸密碼 } //客戶端申請登陸 ReqChat message ReqChatMessage { required string msg = 1;//提示內(nèi)容 } //客戶端申請登陸 ResChat message ResChatMessage { required string msg = 1;//提示內(nèi)容 }}
本人編譯工具自帶生產(chǎn)消息,和對應(yīng)的handler
先把proto文件編譯生產(chǎn)后,放到哪里,然后創(chuàng)建服務(wù)器監(jiān)聽代碼
上一篇文章講到由于java和C#默認(rèn)網(wǎng)絡(luò)端緒不一樣,java是標(biāo)準(zhǔn)端緒大端序,C#使用的小端序。
1 MarshalEndian.JN = MarshalEndian.JavaOrNet.Java;2 Sz.Network.SocketPool.ListenersBox.Instance.SetParams(new MessagePool(), typeof(MarshalEndian));3 Sz.Network.SocketPool.ListenersBox.Instance.Start("tcp:*:9527");
所以在我開啟服務(wù)器監(jiān)聽的時候設(shè)置解碼器和編碼器的解析風(fēng)格為java
然后建立一個文件chat文件夾用于存放handler文件就是剛才工具生成 目錄下的 ExcelSource/protobuf/net/Handler
這一系列文件
1 if (message.MsgID == (int)Sz.Test.ProtoMessage.TestMessage.Proto_Login.ReqLogin) 2 { 3 //構(gòu)建消息 4 Sz.Test.ProtoMessage.TestMessage.ReqLoginMessage loginmessage = new Test.ProtoMessage.TestMessage.ReqLoginMessage(); 5 object msg = DeSerialize(message.MsgBuffer, loginmessage); 6 //構(gòu)建handler 7 Test.ProtoMessage.ReqLoginHandler handler = new Test.ProtoMessage.ReqLoginHandler(); 8 handler.session = client; 9 handler.Message = loginmessage;10 //把handler交給 登錄 線程處理11 ThreadManager.Instance.AddTask(ServerManager.LoginThreadID, handler);12 }13 else if (message.MsgID == (int)Sz.Test.ProtoMessage.TestMessage.Proto_Login.ReqChat)14 {15 //構(gòu)建消息16 Sz.Test.ProtoMessage.TestMessage.ReqChatMessage loginmessage = new Test.ProtoMessage.TestMessage.ReqChatMessage();17 object msg = DeSerialize(message.MsgBuffer, loginmessage);18 //構(gòu)建handler19 Test.ProtoMessage.ReqChatHandler handler = new Test.ProtoMessage.ReqChatHandler();20 handler.Session = client;21 handler.Message = loginmessage;22 //把handler交給 聊天 線程處理23 ThreadManager.Instance.AddTask(ServerManager.ChatThreadID, handler);24 }
收到消息后的處理判斷傳入的消息id是什么類型的,然后對應(yīng)反序列化byte[]數(shù)組為消息
最后把消息和生成handler移交到對應(yīng)的線程處理
登錄的消息全部交給 LoginThread 線程 去處理 ,這樣在真實的運行環(huán)境下,能保證單點登錄問題;
聊天消息全部交給 ChatThread 線程 去處理 這樣的好處是,聊天與登錄無關(guān);
收到登錄消息的處理
1 public class ReqLoginHandler : TcpHandler 2 { 3 public override void Run() 4 { 5 6 var message = (Sz.Test.ProtoMessage.TestMessage.ReqLoginMessage)this.Message; 7 Sz.Test.ProtoMessage.TestMessage.ResTipMessage tip = new TestMessage.ResTipMessage(); 8 if (message.userName == "admin" && message.userPwd == "admin") 9 {10 Logger.Debug("收到登錄消息 登錄完成"); 11 tip.msg = "登錄完成";12 }13 else14 {15 Logger.Debug("收到登錄消息 用戶名或者密碼錯誤"); 16 tip.msg = "用戶名或者密碼錯誤";17 }18 byte[] buffer = MessagePool.Serialize(tip);19 this.Session.SendMsg(new Network.SocketPool.SocketMessage((int)Sz.Test.ProtoMessage.TestMessage.Proto_Login.ResTip, buffer));20 }21 }
收到聊天消息的處理
1 public class ReqChatHandler : TcpHandler 2 { 3 public override void Run() 4 { 5 var message = (Sz.Test.ProtoMessage.TestMessage.ReqChatMessage)this.Message; 6 Logger.Debug("收到來自客戶端聊天消息:" + message.msg); 7 Sz.Test.ProtoMessage.TestMessage.ResChatMessage chat = new TestMessage.ResChatMessage(); 8 chat.msg = "服務(wù)器廣播:" + message.msg; 9 byte[] buffer = MessagePool.Serialize(chat);10 this.Session.SendMsg(new Network.SocketPool.SocketMessage((int)Sz.Test.ProtoMessage.TestMessage.Proto_Login.ResChat, buffer));11 }12 }
接下來我們構(gòu)建
java版本基于netty 二次封裝的socket客戶端
1 package sz.network.socketpool.nettypool; 2 3 import Sz.Test.ProtoMessage.Test.TestMessage; 4 import com.google.protobuf.InvalidProtocolBufferException; 5 import io.netty.channel.ChannelHandlerContext; 6 import java.io.BufferedReader; 7 import java.io.IOException; 8 import java.io.InputStreamReader; 9 import java.util.logging.Level;10 import org.apache.log4j.Logger;11 12 /**13 *14 * @author Administrator15 */16 public class TestClient {17 18 static final Logger log = Logger.getLogger(TestClient.class);19 static NettyTcpClient client = null;20 21 public static void main(String[] args) {22 client = new NettyTcpClient("127.0.0.1", 9527, true, new NettyMessageHandler() {23 24 @Override25 public void channelActive(ChannelHandlerContext session) {26 log.info("連接服務(wù)器成功:");27 //構(gòu)建錯誤的登錄消息28 TestMessage.ReqLoginMessage.Builder newBuilder = TestMessage.ReqLoginMessage.newBuilder();29 newBuilder.setUserName("a");30 newBuilder.setUserPwd("a");31 //發(fā)送消息32 TestClient.client.sendMsg(new NettyMessageBean(TestMessage.Proto_Login.ReqLogin_VALUE, newBuilder.build().toByteArray()));33 34 //構(gòu)建正確的登錄消息35 TestMessage.ReqLoginMessage.Builder newBuilder1 = TestMessage.ReqLoginMessage.newBuilder();36 newBuilder1.setUserName("admin");37 newBuilder1.setUserPwd("admin");38 TestClient.client.sendMsg(new NettyMessageBean(TestMessage.Proto_Login.ReqLogin_VALUE, newBuilder1.build().toByteArray()));39 }40 41 @Override42 public void readMessage(NettyMessageBean msg) {43 try {44 if (msg.getMsgid() == TestMessage.Proto_Login.ResTip_VALUE) {45 TestMessage.ResTipMessage tipmessage = TestMessage.ResTipMessage.parseFrom(msg.getMsgbuffer());46 log.info("收到提示信息:" + tipmessage.getMsg());47 } else if (msg.getMsgid() == TestMessage.Proto_Login.ResChat_VALUE) {48 TestMessage.ResChatMessage tipmessage = TestMessage.ResChatMessage.parseFrom(msg.getMsgbuffer());49 log.info("收到聊天消息:" + tipmessage.getMsg());50 }51 } catch (InvalidProtocolBufferException ex) {52 log.error("收到消息:" + msg.getMsgid() + " 解析出錯:" + ex);53 }54 }55 56 @Override57 public void closeSession(ChannelHandlerContext session) {58 log.info("連接關(guān)閉或者連接不成功:");59 }60 61 @Override62 public void exceptionCaught(ChannelHandlerContext session, Throwable cause) {63 log.info("錯誤:" + cause.toString());64 }65 });66 client.Connect();67 68 BufferedReader strin = new BufferedReader(new InputStreamReader(System.in));69 while (true) {70 try {71 String str = strin.readLine();72 //構(gòu)建聊天消息73 TestMessage.ReqChatMessage.Builder chatmessage = TestMessage.ReqChatMessage.newBuilder();74 chatmessage.setMsg(str);75 TestClient.client.sendMsg(new NettyMessageBean(TestMessage.Proto_Login.ReqChat_VALUE, chatmessage.build().toByteArray()));76 } catch (IOException ex) {77 }78 }79 80 }81 82 }
接下來我們看看效果
我設(shè)置了斷線重連功能,我們來測試一下,把服務(wù)器關(guān)閉
可以看到?jīng)]3秒向服務(wù)器發(fā)起一次請求;
知道服務(wù)器再次開啟鏈接成功
完整的通信示例演示就完了;
代碼我不在上傳了,請各位使用svn下載好么????
需要注意的是,消息的解碼器和編碼器,一定要雙方都遵守你自己的契約。比如我在編碼消息格式的時候先寫入消息包的長度,然后跟上消息的id,再是消息的內(nèi)容
所以解碼的時候,先讀取一個消息長度,在讀取一個消息id,如果本次收到的消息字節(jié)數(shù)不夠長度那么留存起來以用于下一次收到字節(jié)數(shù)組追加后再一起解析。
這樣就能解決粘包的問題。
附上C#版本的解析器
1 using System; 2 using System.Collections.Generic; 3 using System.IO; 4 using System.Linq; 5 using System.Text; 6 using System.Threading.Tasks; 7 8 /** 9 * 10 * @author 失足程序員 11 * @Blog http://www.survivalescaperooms.com/ty408/ 12 * @mail 492794628@QQ.com 13 * @phone 13882122019 14 * 15 */ 16 namespace Sz.Network.SocketPool 17 { 18 public class MarshalEndian : IMarshalEndian 19 { 20 21 public enum JavaOrNet 22 { 23 Java, 24 Net, 25 } 26 27 public MarshalEndian() 28 { 29 30 } 31 32 public static JavaOrNet JN = JavaOrNet.Net; 33 34 /// <summary> 35 /// 讀取大端序的int 36 /// </summary> 37 /// <param name="value"></param> 38 public int ReadInt(byte[] intbytes) 39 { 40 Array.Reverse(intbytes); 41 return BitConverter.ToInt32(intbytes, 0); 42 } 43 44 /// <summary> 45 /// 寫入大端序的int 46 /// </summary> 47 /// <param name="value"></param> 48 public byte[] WriterInt(int value) 49 { 50 byte[] bs = BitConverter.GetBytes(value); 51 Array.Reverse(bs); 52 return bs; 53 } 54 55 //用于存儲剩余未解析的字節(jié)數(shù) 56 private List<byte> _LBuff = new List<byte>(2); 57 58 //字節(jié)數(shù)常量一個消息id4個字節(jié) 59 const long ConstLenght = 4L; 60 61 public void Dispose() 62 { 63 this.Dispose(true); 64 GC.SuppressFinalize(this); 65 } 66 67 protected virtual void Dispose(bool flag1) 68 { 69 if (flag1) 70 { 71 IDisposable disposable = this._LBuff as IDisposable; 72 if (disposable != null) { disposable.Dispose(); } 73 } 74 } 75 76 public byte[] Encoder(SocketMessage msg) 77 { 78 MemoryStream ms = new MemoryStream(); 79 BinaryWriter bw = new BinaryWriter(ms, UTF8Encoding.Default); 80 byte[] msgBuffer = msg.MsgBuffer; 81 82 if (msgBuffer != null) 83 { 84 switch (JN) 85 { 86 case JavaOrNet.Java: 87 bw.Write(WriterInt(msgBuffer.Length + 4)); 88 bw.Write(WriterInt(msg.MsgID)); 89 break; 90 case JavaOrNet.Net: 91 bw.Write((Int32)(msgBuffer.Length + 4)); 92 bw.Write(msg.MsgID); 93 break; 94 } 95 96 bw.Write(msgBuffer); 97 } 98 else 99 {100 switch (JN)101 {102 case JavaOrNet.Java:103 bw.Write(WriterInt(0));104 break;105 case JavaOrNet.Net:106 bw.Write((Int32)0);107 break;108 }109 }110 bw.Close();111 ms.Close();112 bw.Dispose();113 ms.Dispose();114 return ms.ToArray();115 }116 117 public List<SocketMessage> Decoder(byte[] buff, int len)118 {119 //拷貝本次的有效字節(jié)120 byte[] _b = new byte[len];121 Array.Copy(buff, 0, _b, 0, _b.Length);122 buff = _b;123 if (this._LBuff.Count > 0)124 {125 //拷貝之前遺留的字節(jié)126 this._LBuff.AddRange(_b);127 buff = this._LBuff.ToArray();128 this._LBuff.Clear();129 this._LBuff = new List<byte>(2);130 }131 List<SocketMessage> list = new List<SocketMessage>();132 MemoryStream ms = new MemoryStream(buff);133 BinaryReader buffers = new BinaryReader(ms, UTF8Encoding.Default);134 try135 {136 byte[] _buff;137 Label_0073:138 //判斷本次解析的字節(jié)是否滿足常量字節(jié)數(shù) 139 if ((buffers.BaseStream.Length - buffers.BaseStream.Position) < ConstLenght)140 {141 _buff = buffers.ReadBytes((int)(buffers.BaseStream.Length - buffers.BaseStream.Position));142 this._LBuff.AddRange(_buff);143 }144 else145 {146 long offset = 0;147 switch (JN)148 {149 case JavaOrNet.Java:150 offset = ReadInt(buffers.ReadBytes(4));151 break;152 case JavaOrNet.Net:153 offset = buffers.ReadInt32();154 break;155 }156 157 //剩余字節(jié)數(shù)大于本次需要讀取的字節(jié)數(shù)158 if (offset <= (buffers.BaseStream.Length - buffers.BaseStream.Position))159 {160 int msgID = 0;161 switch (JN)162 {163 case JavaOrNet.Java:164 msgID = ReadInt(buffers.ReadBytes(4));165 break;166 case JavaOrNet.Net:167 msgID = buffers.ReadInt32();168 break;169 }170 _buff = buffers.ReadBytes((int)(offset - 4));171 list.Add(new SocketMessage(msgID, _buff));172 goto Label_0073;173 }174 else175 {176 //剩余字節(jié)數(shù)剛好小于本次讀取的字節(jié)數(shù) 存起來,等待接受剩余字節(jié)數(shù)一起解析177 buffers.BaseStream.Seek(ConstLenght, SeekOrigin.Current);178 _buff = buffers.ReadBytes((int)(buffers.BaseStream.Length - buffers.BaseStream.Position));179 this._LBuff.AddRange(_buff);180 }181 }182 }183 catch { }184 finally185 {186 buffers.Close();187 if (buffers != null) { buffers.Dispose(); }188 ms.Close();189 if (ms != null) { ms.Dispose(); }190 }191 return list;192 }193 }194 }
謝謝觀賞~!
新聞熱點
疑難解答