TCP网络通信框架的讲解与使用

客户端

1.消息基类BaseMessage

在消息基类中定义了得到消息ID消息长度的函数,以便于我们后续解析消息,处理分包与粘包。

派生类BaseMessage<T>

这个类的T泛型被约束为Google.Protobuf.IMessage,是Protobuf的基本消息类型,我们的BaseMessage<T>中保存着一个dataT类型用于承载数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public abstract class BaseMessage
{
public abstract int GetByteLength();

public abstract byte[] GetBytes();

public abstract int GetMessageID();

public abstract void WriteIn(byte[] buffer, int beginIndex,int length);
}

public abstract class BaseMessage<T> : BaseMessage where T : Google.Protobuf.IMessage,new()
{
public T data = new T();

public override int GetByteLength()
{
return 8 + (data == null ? 0 : data.CalculateSize());
}

public override byte[] GetBytes()
{
byte[] buffer = new byte[GetByteLength()];
BitConverter.GetBytes(GetMessageID()).CopyTo(buffer, 0);
BitConverter.GetBytes(GetByteLength()).CopyTo(buffer, 4);
if (buffer.Length > 8)
data.ToByteArray().CopyTo(buffer, 8);
return buffer;
}

public override int GetMessageID()
{
throw new NotImplementedException();
}

public override void WriteIn(byte[] buffer, int beginIndex,int length)
{
throw new NotImplementedException();
}
}

2.NetAsyncMgr网络管理器

这个类的作用是用来建立对服务器的TCP连接。

值得一提的是这个网络管理器是完全用异步函数写的,这说明不会造成阻塞
不会造成阻塞就可以避免开启其他线程执行任务,避免资源浪费

连接

public static void Connect(string host, int port)

通过Socket来建立TCP连接,初始化本地Socket,为SocketAsyncEventArgs设置服务器端口,再添加连接成功(三次握手)之后的回调,在回调中开启异步接收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
 public static void Connect(string host, int port)
{
if (isConnected) return;

IPEndPoint SeveriPEndPoint = new IPEndPoint(IPAddress.Parse(host), port);
m_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

SocketAsyncEventArgs argsConnect = new SocketAsyncEventArgs();
argsConnect.RemoteEndPoint = SeveriPEndPoint;

argsConnect.Completed += (socket, args1) =>
{
if (args1.SocketError == SocketError.Success)
{
Debug.Log($"连接成功: {host}:{port}");
SendHeartMessage();
//接收消息
SocketAsyncEventArgs argsRecive = new SocketAsyncEventArgs();
argsRecive.SetBuffer(bufferBytes, 0, bufferBytes.Length);
argsRecive.Completed += Recive;
m_socket.ReceiveAsync(argsRecive);
isConnected = true;
}
else
{
Debug.Log($"连接失败:{args1.SocketError}");
}
};
m_socket.ConnectAsync(argsConnect);
}

发送

public static void Send(BaseMessage info)

这里同样使用异步函数,进行发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void Send(BaseMessage info)
{
if (m_socket != null && m_socket.Connected && isConnected)
{
byte[] bytes = info.GetBytes();

SocketAsyncEventArgs argsSend = new SocketAsyncEventArgs();
argsSend.SetBuffer(bytes, 0, bytes.Length);
argsSend.Completed += (socket, args) =>
{
if (args.SocketError == SocketError.Success)
{
}
else
{
Debug.Log($"{args.SocketError}");
Close();
}
};
m_socket.SendAsync(argsSend);
}
else
{
if (isConnected == true)
Close();
}
}

接收消息

在建立好连接后立即开始了消息的接收,每一次接成功收到消息后再次开启接收异步函数。

接收到消息后我们对消息进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static void Recive(object socket, SocketAsyncEventArgs args)
{
if (args.SocketError == SocketError.Success)
{
int bytesLength = args.BytesTransferred;

HandleReceiveMessage(bytesLength);

//接收消息
if (socket != null && m_socket.Connected && isConnected)
args.SetBuffer(bufferLenght, bufferBytes.Length);
m_socket.ReceiveAsync(args);
}
else
{
Debug.Log($"{args.SocketError}");
if (isConnected == true)
Close();
}
}

处理消息

Try Catch进行异常捕获,并在接收消息时进行处理分包和粘包现象。

Connect函数中,设置了接收到的消息的储存位置 argsRecive.SetBuffer(bufferBytes, 0, bufferBytes.Length);bufferBytes的第零位开始存储,大小为bufferBytes的长度。

  • bufferLenght是待处理的消息长度,将bufferLenght+=reciveLength
  • 循环
    • bufferLenght >= 8的时候才能够解析出消息ID消息长度,否则不是一个完整的消息。
    • bufferLenght - currentIndex >= messageBodyLength && massageID != -1如果解析出消息,且接收到的消息长度大于解析到的消息长度,那么可以解析
      • 根据ID实例化一个消息
      • 通过接收到的数据反序列化消息
      • 将消息推进消息队列
      • currentIndex += messageBodyLength
      • if (currentIndex == bufferLenght)判断是否还有剩余的消息
        • 没有bufferLenght置零
        • 结束循环
      • else
        • 还有字节没有解析,说明发生了粘包
        • 继续循环解析
    • 否则接收到的消息长度小于解析到的消息长度,发生了分包
      • 将当前接受的残缺的消息放到bufferLenght的开头
      • 消息长度设置为残缺的消息的长度
      • 停止循环,这样等待到下一个消息接收到就可以拼接成完整消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 private static void HandleReceiveMessage(int reciveLength)
{
try
{
if (reciveLength == 0) return;

//处理
int massageID = -1;
int messageBodyLength = 0;
int currentIndex = 0;


bufferLenght += reciveLength;

while (true) //粘包
{
if (bufferLenght >= 8)
{
//ID
massageID = BitConverter.ToInt32(bufferBytes, currentIndex);
currentIndex += 4;
//长度
messageBodyLength = BitConverter.ToInt32(bufferBytes, currentIndex) - 8;
currentIndex += 4;
}

if (bufferLenght - currentIndex >= messageBodyLength && massageID != -1)
{
//消息体
BaseMessage baseMassage = MessagePool.GetMessage(massageID);
baseMassage.WriteIn(bufferBytes, currentIndex, messageBodyLength);

reciveMessageQueue.Enqueue(baseMassage);

currentIndex += messageBodyLength;
if (currentIndex == bufferLenght)
{
bufferLenght = 0;
break;
}
}
else //分包
{
Array.Copy(bufferBytes, currentIndex - 8, bufferBytes, 0, bufferLenght - currentIndex + 8);
bufferLenght = bufferLenght - currentIndex + 8;
break;
}
}
}
catch (Exception e)
{
Console.WriteLine($"消息解析出错: {e.Message}");
}
}

消息分发

注意哦:要调用MsgUpdate这个消息才会开启消息分发的哦
可以考虑用其他线程来处理消息

MsgUpdate循环中

  • 如果消息队列为空返回
  • 循环MAX_MESSAGE_FIRE
    • 从队列中取一条消息
    • 取到了解析消息
    • 取不到,队列为空,结束循环
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void MsgUpdate()
{
//初步判断,提升效率
if (reciveMessageQueue.Count == 0)
{
return;
}

//重复处理消息
for (int i = 0; i < MAX_MESSAGE_FIRE; i++)
{
//获取第一条消息
BaseMessage msgBase = null;
lock (reciveMessageQueue)
{
if (reciveMessageQueue.Count > 0)
{
msgBase = reciveMessageQueue.Dequeue();
}
}

//分发消息
if (msgBase != null)
{
listeners[msgBase.GetMessageID()]?.Invoke(msgBase);
}
//没有消息了
else
{
break;
}
}
}

添加网络消息监听

一个键为消息ID,值为事件的字典。

1
2
//监听消息
private static Dictionary<int, Action<BaseMessage>> listeners = new Dictionary<int, Action<BaseMessage>>();

通过以下函数添加、移除事件监听:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/// <summary>
/// 添加消息监听
/// </summary>
/// <param name="messageID"></param>
/// <param name="callback"></param>
public static void AddNetMessageListener(int messageID, Action<BaseMessage> callback)
{
if (listeners.ContainsKey(messageID))
listeners[messageID] += callback;
else
Debug.LogWarning("没有这个消息类型" + messageID);
}

/// <summary>
/// 移除网络消息监听
/// </summary>
/// <param name="messageID"></param>
/// <param name="callback"></param>
public static void RemoveNetMessageListener(int messageID, Action<BaseMessage> callback)
{
if (listeners.ContainsKey(messageID))
listeners[messageID] -= callback;
else
Debug.LogWarning("没有这个消息类型" + messageID);
}

3.消息生成器与消息池

消息生成

通过编写Proto文件来定义消息。

如果你知道Protobuf那么请使用其语法编写你需要的消息类,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
 syntax = "proto3";  
 package NetGameRunning;
 message GlobalChatData {
    string chat_words = 1;
 }
 message EmptyMessageData {
    float x = 1;
    float y = 2;
    float z = 3;
    float ex = 4;
    float ey = 5;
    float ez = 6;
 }

并且我们需要一个XML文件来进行消息IDProto类的映射,例如:

1
2
3
4
5
6
7
8
9
<messages>
<message id="1" systemMessage="1" name="QuitMessage" namespace="NetSystem" />
<message id="2" systemMessage="1" name="HeartMessage" namespace="NetSystem" />
<!-- 以下是示例文件中的消息,不需要使用示例代码可以删除 -->
<message id="10002" systemMessage="0" name="ChatMessage" namespace="NetGameRunning"
datatype="NetGameRunning.GlobalChatData" />
<message id="10003" systemMessage="0" name="EmptyMessage" namespace="NetGameRunning"
datatype="NetGameRunning.EmptyMessageData" />
</messages>

id:消息ID
namespace:命名空间,对应proto中的package
name:消息类名(自己定义的,注意不要proto中的message相同)
datatype:消息中data(上文中提到过是BaseMessage中的泛型T),一般是对应proto中的package.message

最后我们指定生成位置proto.exe位置后,依次点击生成消息类,生成消息池

消息池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static class MessagePool
{
public static int QuitMessage_ID = 1;
public static int HeartMessage_ID = 2;
public static int ChatMessage_ID = 10002;
public static int EmptyMessage_ID = 10003;
static int[] messageIDs = new int[] { 1, 2, 10002, 10003 };
public static int[] MessageIDs => messageIDs;
private static readonly System.Collections.Generic.Dictionary<int, System.Func<BaseMessage>> MessageTypeMap = new System.Collections.Generic.Dictionary<int, System.Func<BaseMessage>>
{
{1,() => new NetSystem.QuitMessage()},
{2,() => new NetSystem.HeartMessage()},
{10002,() => new NetGameRunning.ChatMessage()},
{10003,() => new NetGameRunning.EmptyMessage()}
};
public static BaseMessage GetMessage(int id)
{
if (MessageTypeMap.TryGetValue(id, out System.Func<BaseMessage> messageFactory)) { return messageFactory?.Invoke(); }
return null;
}
}

这个消息池类由代码生成,使用了一个字典,用初始化的方式将消息ID消息类型映射,通过消息ID返回一个对应类型,以便正确地解析消息

服务器

服务器采用全异步方式编写,避免阻塞线程,避免开启其他线程增加开销

开启服务器 ServerSocket

Start函数里面

  • 初始化了socket
  • 并且开启了接收客户端连接,并且传入连接成功后的回调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void Start(string ip, int port, int num)
{
this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPEndPoint ipEndPoint = new IPEndPoint(IPAddress.Parse(ip), port);
try
{
socket.Bind(ipEndPoint);
socket.Listen(num);
this.socket.BeginAccept(Accept, this.socket);
}
catch (Exception e)
{
Console.WriteLine($"服务器开启失败: {e.Message}");
}
}

Accept函数中

  • 将连接的客户端加入clientSockets列表
  • 继续接收客户端连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void Accept(IAsyncResult result)
{
try
{
Socket clientSocket = this.socket.EndAccept(result);
ClientSocket client = new ClientSocket(clientSocket);
clientSockets.Add(client.clientID, client);

Console.WriteLine($"客户端[{clientSocket.RemoteEndPoint}]连接服务器");

this.socket.BeginAccept(Accept, this.socket);
}
catch (Exception e)
{
Console.WriteLine($"客户端接入失败: {e.Message}");
}
}

ClientSocket

这里的ClientSocket与客户端中的ClientSocket没有太大区别

仅有的不同之处ThreadPool.QueueUserWorkItem(HandleMassage, baseMassage);,这里用线程池来处理消息,消息处理方式也是仅仅将消息分发出去。

其实感觉Unity哪里也可以用线程池
但是有一个问题就是线程太多了
还有在Unity中用多线程总归不太好,因为只能从主线程访问Unity相关组件、对象以及UnityEngine命名空间中的绝大部分内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void HandleReceiveMessage(int bytesLength)
{
//省略...

while (true)//粘包
{
if (bufferLenght >= 8)
{
//省略...
}

if (bufferLenght - currentIndex >= massageBodyLength && massageBodyLength != -1 && massageID != -1)
{
//消息体
BaseMessage baseMassage = MessagePool.GetMessage(massageID);

if (baseMassage != null)
{
//省略...

ThreadPool.QueueUserWorkItem(HandleMassage, baseMassage);
}

//省略...
}
else//分包
{
//省略...
}
}
}

消息监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void AddNetMessageListener(int messageID, Action<BaseMessage> callback)
{
if (listeners.ContainsKey(messageID))
listeners[messageID] += callback;
else
Debug.LogWarning("没有这个消息类型" + messageID);
}

public static void RemoveNetMessageListener(int messageID, Action<BaseMessage> callback)
{
if (listeners.ContainsKey(messageID))
listeners[messageID] -= callback;
else
Debug.LogWarning("没有这个消息类型" + messageID);
}

同样的,使用AddNetMessageListenerRemoveNetMessageListener对消息进行添加监听和移除。

消息ID可以根据消息池来获取
例如:MessagePool.LoginMessage_ID