客户端
1.消息基类BaseMessage
在消息基类中定义了得到消息ID、消息长度的函数,以便于我们后续解析消息,处理分包与粘包。
派生类BaseMessage<T>
这个类的T
泛型被约束为Google.Protobuf.IMessage
,是Protobuf
的基本消息类型,我们的BaseMessage<T>
中保存着一个data
是T
类型用于承载数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public abstract class BaseMessage { public abstract int GetByteLength();
public abstract byte[] GetBytes();
public abstract int GetMessageID();
public abstract void WriteIn(byte[] buffer, int beginIndex,int length); }
public abstract class BaseMessage<T> : BaseMessage where T : Google.Protobuf.IMessage,new() { public T data = new T();
public override int GetByteLength() { return 8 + (data == null ? 0 : data.CalculateSize()); }
public override byte[] GetBytes() { byte[] buffer = new byte[GetByteLength()]; BitConverter.GetBytes(GetMessageID()).CopyTo(buffer, 0); BitConverter.GetBytes(GetByteLength()).CopyTo(buffer, 4); if (buffer.Length > 8) data.ToByteArray().CopyTo(buffer, 8); return buffer; }
public override int GetMessageID() { throw new NotImplementedException(); }
public override void WriteIn(byte[] buffer, int beginIndex,int length) { throw new NotImplementedException(); } }
|
2.NetAsyncMgr
网络管理器
这个类的作用是用来建立对服务器的TCP
连接。
值得一提的是这个网络管理器是完全用异步函数写的,这说明不会造成阻塞
不会造成阻塞就可以避免开启其他线程执行任务,避免资源浪费
连接
public static void Connect(string host, int port)
通过Socket
来建立TCP
连接,初始化本地Socket
,为SocketAsyncEventArgs
设置服务器端口,再添加连接成功(三次握手)之后的回调,在回调中开启异步接收。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public static void Connect(string host, int port) { if (isConnected) return;
IPEndPoint SeveriPEndPoint = new IPEndPoint(IPAddress.Parse(host), port); m_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
SocketAsyncEventArgs argsConnect = new SocketAsyncEventArgs(); argsConnect.RemoteEndPoint = SeveriPEndPoint;
argsConnect.Completed += (socket, args1) => { if (args1.SocketError == SocketError.Success) { Debug.Log($"连接成功: {host}:{port}"); SendHeartMessage(); SocketAsyncEventArgs argsRecive = new SocketAsyncEventArgs(); argsRecive.SetBuffer(bufferBytes, 0, bufferBytes.Length); argsRecive.Completed += Recive; m_socket.ReceiveAsync(argsRecive); isConnected = true; } else { Debug.Log($"连接失败:{args1.SocketError}"); } }; m_socket.ConnectAsync(argsConnect); }
|
发送
public static void Send(BaseMessage info)
这里同样使用异步函数,进行发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public static void Send(BaseMessage info) { if (m_socket != null && m_socket.Connected && isConnected) { byte[] bytes = info.GetBytes();
SocketAsyncEventArgs argsSend = new SocketAsyncEventArgs(); argsSend.SetBuffer(bytes, 0, bytes.Length); argsSend.Completed += (socket, args) => { if (args.SocketError == SocketError.Success) { } else { Debug.Log($"{args.SocketError}"); Close(); } }; m_socket.SendAsync(argsSend); } else { if (isConnected == true) Close(); } }
|
接收消息
在建立好连接后立即开始了消息的接收,每一次接成功收到消息后再次开启接收异步函数。
接收到消息后我们对消息进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| private static void Recive(object socket, SocketAsyncEventArgs args) { if (args.SocketError == SocketError.Success) { int bytesLength = args.BytesTransferred;
HandleReceiveMessage(bytesLength);
if (socket != null && m_socket.Connected && isConnected) args.SetBuffer(bufferLenght, bufferBytes.Length); m_socket.ReceiveAsync(args); } else { Debug.Log($"{args.SocketError}"); if (isConnected == true) Close(); } }
|
处理消息
用Try Catch
进行异常捕获,并在接收消息时进行处理分包和粘包现象。
在Connect
函数中,设置了接收到的消息的储存位置 argsRecive.SetBuffer(bufferBytes, 0, bufferBytes.Length);
从bufferBytes
的第零位开始存储,大小为bufferBytes
的长度。
- bufferLenght是待处理的消息长度,将
bufferLenght+=reciveLength
- 循环
- 当
bufferLenght >= 8
的时候才能够解析出消息ID、消息长度,否则不是一个完整的消息。
bufferLenght - currentIndex >= messageBodyLength && massageID != -1
如果解析出消息,且接收到的消息长度大于解析到的消息长度,那么可以解析
- 根据ID实例化一个消息
- 通过接收到的数据反序列化消息
- 将消息推进消息队列
currentIndex += messageBodyLength
if (currentIndex == bufferLenght)
判断是否还有剩余的消息
else
- 否则接收到的消息长度小于解析到的消息长度,发生了分包
- 将当前接受的残缺的消息放到
bufferLenght
的开头
- 消息长度设置为残缺的消息的长度
- 停止循环,这样等待到下一个消息接收到就可以拼接成完整消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| private static void HandleReceiveMessage(int reciveLength) { try { if (reciveLength == 0) return;
int massageID = -1; int messageBodyLength = 0; int currentIndex = 0;
bufferLenght += reciveLength;
while (true) { if (bufferLenght >= 8) { massageID = BitConverter.ToInt32(bufferBytes, currentIndex); currentIndex += 4; messageBodyLength = BitConverter.ToInt32(bufferBytes, currentIndex) - 8; currentIndex += 4; }
if (bufferLenght - currentIndex >= messageBodyLength && massageID != -1) { BaseMessage baseMassage = MessagePool.GetMessage(massageID); baseMassage.WriteIn(bufferBytes, currentIndex, messageBodyLength);
reciveMessageQueue.Enqueue(baseMassage);
currentIndex += messageBodyLength; if (currentIndex == bufferLenght) { bufferLenght = 0; break; } } else { Array.Copy(bufferBytes, currentIndex - 8, bufferBytes, 0, bufferLenght - currentIndex + 8); bufferLenght = bufferLenght - currentIndex + 8; break; } } } catch (Exception e) { Console.WriteLine($"消息解析出错: {e.Message}"); } }
|
消息分发
注意哦:要调用MsgUpdate这个消息才会开启消息分发的哦
可以考虑用其他线程来处理消息
在MsgUpdate
循环中
- 如果消息队列为空返回
- 循环
MAX_MESSAGE_FIRE
次
- 从队列中取一条消息
- 取到了解析消息
- 取不到,队列为空,结束循环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public static void MsgUpdate() { if (reciveMessageQueue.Count == 0) { return; }
for (int i = 0; i < MAX_MESSAGE_FIRE; i++) { BaseMessage msgBase = null; lock (reciveMessageQueue) { if (reciveMessageQueue.Count > 0) { msgBase = reciveMessageQueue.Dequeue(); } }
if (msgBase != null) { listeners[msgBase.GetMessageID()]?.Invoke(msgBase); } else { break; } } }
|
添加网络消息监听
一个键为消息ID,值为事件的字典。
1 2
| private static Dictionary<int, Action<BaseMessage>> listeners = new Dictionary<int, Action<BaseMessage>>();
|
通过以下函数添加、移除事件监听:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
public static void AddNetMessageListener(int messageID, Action<BaseMessage> callback) { if (listeners.ContainsKey(messageID)) listeners[messageID] += callback; else Debug.LogWarning("没有这个消息类型" + messageID); }
public static void RemoveNetMessageListener(int messageID, Action<BaseMessage> callback) { if (listeners.ContainsKey(messageID)) listeners[messageID] -= callback; else Debug.LogWarning("没有这个消息类型" + messageID); }
|
3.消息生成器与消息池
消息生成
通过编写Proto
文件来定义消息。
如果你知道Protobuf
那么请使用其语法编写你需要的消息类,例如:
1 2 3 4 5 6 7 8 9 10 11 12 13
| syntax = "proto3"; package NetGameRunning; message GlobalChatData { string chat_words = 1; } message EmptyMessageData { float x = 1; float y = 2; float z = 3; float ex = 4; float ey = 5; float ez = 6; }
|
并且我们需要一个XML
文件来进行消息ID
和Proto类
的映射,例如:
1 2 3 4 5 6 7 8 9
| <messages> <message id="1" systemMessage="1" name="QuitMessage" namespace="NetSystem" /> <message id="2" systemMessage="1" name="HeartMessage" namespace="NetSystem" /> <message id="10002" systemMessage="0" name="ChatMessage" namespace="NetGameRunning" datatype="NetGameRunning.GlobalChatData" /> <message id="10003" systemMessage="0" name="EmptyMessage" namespace="NetGameRunning" datatype="NetGameRunning.EmptyMessageData" /> </messages>
|
id
:消息ID
namespace
:命名空间,对应proto
中的package
name
:消息类名(自己定义的,注意不要与proto
中的message
相同)
datatype
:消息中data
(上文中提到过是BaseMessage
中的泛型T
),一般是对应proto
中的package.message
最后我们指定生成位置和proto.exe位置后,依次点击生成消息类,生成消息池
消息池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public static class MessagePool { public static int QuitMessage_ID = 1; public static int HeartMessage_ID = 2; public static int ChatMessage_ID = 10002; public static int EmptyMessage_ID = 10003; static int[] messageIDs = new int[] { 1, 2, 10002, 10003 }; public static int[] MessageIDs => messageIDs; private static readonly System.Collections.Generic.Dictionary<int, System.Func<BaseMessage>> MessageTypeMap = new System.Collections.Generic.Dictionary<int, System.Func<BaseMessage>> { {1,() => new NetSystem.QuitMessage()}, {2,() => new NetSystem.HeartMessage()}, {10002,() => new NetGameRunning.ChatMessage()}, {10003,() => new NetGameRunning.EmptyMessage()} }; public static BaseMessage GetMessage(int id) { if (MessageTypeMap.TryGetValue(id, out System.Func<BaseMessage> messageFactory)) { return messageFactory?.Invoke(); } return null; } }
|
这个消息池类由代码生成,使用了一个字典,用初始化的方式将消息ID和消息类型映射,通过消息ID返回一个对应类型,以便正确地解析消息。
服务器
服务器采用全异步方式编写,避免阻塞线程,避免开启其他线程增加开销
开启服务器 ServerSocket
在Start
函数里面
- 初始化了
socket
- 并且开启了接收客户端连接,并且传入连接成功后的回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public void Start(string ip, int port, int num) { this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); IPEndPoint ipEndPoint = new IPEndPoint(IPAddress.Parse(ip), port); try { socket.Bind(ipEndPoint); socket.Listen(num); this.socket.BeginAccept(Accept, this.socket); } catch (Exception e) { Console.WriteLine($"服务器开启失败: {e.Message}"); } }
|
在Accept
函数中
- 将连接的客户端加入
clientSockets
列表
- 继续接收客户端连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private void Accept(IAsyncResult result) { try { Socket clientSocket = this.socket.EndAccept(result); ClientSocket client = new ClientSocket(clientSocket); clientSockets.Add(client.clientID, client);
Console.WriteLine($"客户端[{clientSocket.RemoteEndPoint}]连接服务器");
this.socket.BeginAccept(Accept, this.socket); } catch (Exception e) { Console.WriteLine($"客户端接入失败: {e.Message}"); } }
|
ClientSocket
这里的ClientSocket
与客户端中的ClientSocket
没有太大区别
仅有的不同之处ThreadPool.QueueUserWorkItem(HandleMassage, baseMassage);
,这里用线程池来处理消息,消息处理方式也是仅仅将消息分发出去。
其实感觉Unity哪里也可以用线程池
但是有一个问题就是线程太多了
还有在Unity中用多线程总归不太好,因为只能从主线程访问Unity相关组件、对象以及UnityEngine命名空间中的绝大部分内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| private void HandleReceiveMessage(int bytesLength) {
while (true) { if (bufferLenght >= 8) { }
if (bufferLenght - currentIndex >= massageBodyLength && massageBodyLength != -1 && massageID != -1) { BaseMessage baseMassage = MessagePool.GetMessage(massageID);
if (baseMassage != null) {
ThreadPool.QueueUserWorkItem(HandleMassage, baseMassage); }
} else { } } }
|
消息监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public static void AddNetMessageListener(int messageID, Action<BaseMessage> callback) { if (listeners.ContainsKey(messageID)) listeners[messageID] += callback; else Debug.LogWarning("没有这个消息类型" + messageID); }
public static void RemoveNetMessageListener(int messageID, Action<BaseMessage> callback) { if (listeners.ContainsKey(messageID)) listeners[messageID] -= callback; else Debug.LogWarning("没有这个消息类型" + messageID); }
|
同样的,使用AddNetMessageListener
和RemoveNetMessageListener
对消息进行添加监听和移除。
消息ID可以根据消息池来获取
例如:MessagePool.LoginMessage_ID