TCP数据流
系统缓冲区
当收到对端数据时,操作系统会将数据存入到Socket的接收缓冲区中
操作系统层面上的缓冲区完全由操作系统操作,程序并不能直接操作它们,只能通过socket.Receive、socket.Send等方法来间接操作。当系统的接收缓冲区为空,Receive方法会被阻塞,直到里面有数据。同样地,Socket的Send方法只是把数据写入到发送缓冲区里,具体的发送过程由操作系统负责。当操作系统的发送缓冲区满了,Send方法将会阻塞
粘包半包现象
如果发送端快速发送多条数据,接收端没有及时调用Receive,那么数据便会在接收端的缓冲区中累积
解决粘包半包现象
一般有三种方法可以解决粘包和半包问题,分别是长度信息法、固定长度法和结束符号法
长度信息法
长度信息法是指在每个数据包前面加上长度信息。每次接收到数据后,先读取表示长度的字节,如果缓冲区的数据长度大于要取的字节数,则取出相应的字节,否则等待下一次数据接收。
游戏程序一般会使用16位整型数或32位整型数来存放长度信息 。16位整型数的取值范围是0~65535,32位整型数的取值范围是0~4294967295。对于大部分游戏,网络消息的长度很难超过65535字节,使用16位整型数来存放长度信息较合适
固定长度法
每次都以相同的长度发送数据,假设规定每条信息的长度都为10个字符,那么发送“Hello”“Unity”两条信息可以发送成“He llo... ”“Unity... ”,其中的“. ”表示填充字符,是为凑数,没有实际意义,只为了每次发送的数据都有固定长度。接收方每次读取10个字符,作为一条消息去处理。如果读到的字符数大于10,比如第1次读到“He llo...Un”,那它只要把前10个字节“Hello... ”抽取出来,再把后面的两个字节“Un”存起来,等到再次接收数据,拼接第二条信息。
结束符号法
规定一个结束符号,作为消息间的分隔符
实现
发送数据
//点击发送按钮
public void Send(string sendStr)
{
//组装协议
byte[] bodyBytes = System.Text.Encoding.Default.GetBytes(sendStr);
Int16 len = (Int16)bodyBytes.Length;
byte[] lenBytes = BitConverter.GetBytes(len);
byte[] sendBytes = lenBytes.Concat(bodyBytes).ToArray();
//为了精简代码:使用同步Send
//不考虑抛出异常
socket.Send( sendBytes);
}
接收数据
游戏程序一般会使用“长度信息法”处理粘包问题,核心思想是定义一个缓冲区(readBuff)和一个指示缓冲区有效数据长度变量(buffCount)。
//接收缓冲区
byte[] readBuff = new byte[1024];
//接收缓冲区的数据长度
int buffCount = 0;
比如,readBuff中有5个字节的数据“world”(其余为byte的默认值0),那么buffCount的值应是5
因为存在粘包现象,缓冲区里面会保存尚未处理的数据。所以接收数据时不再从缓冲区开头的位置写入,而是把新数据放在有效数据之后
如果使用异步Socket, BeginReceive的参数应填成下面的样子:
socket.BeginReceive(readBuff, //缓冲区
buffCount, //开始位置
1024-buffCount, //最多读取多少数据
0, //标志位,设成0即可
ReceiveCallback, //回调函数
socket); //状态
在收到数据后,程序需要更新buffCount,以使下一次接收数据时,写入到缓冲区有效数据的末尾
public void ReceiveCallback(IAsyncResult ar){
Socket socket = (Socket) ar.AsyncState;
//获取接收数据长度
int count = socket.EndReceive(ar);
buffCount+=count;
……
}
处理数据
收到数据后,如果缓冲区的数据足够长,超过1条消息的长度,就把消息提取出来处理。如果数据长度不够,不去处理它,等待下一次接收数据。
public void OnReceiveData(){
//消息长度
if(buffCount <= 2)
return;
Int16 bodyLength = BitConverter.ToInt16(readBuff, 0);
//消息体
if(buffCount < 2+bodyLength)
return;
string s = System.Text.Encoding.UTF8.GetString(readBuff, 2, buffCount);
//s是消息内容
//更新缓冲区
int start = 2 + bodyLength;
int count = buffCount - start;
Array.Copy(readBuff, start, readBuff, 0, count);
buffCount -= start;
//继续读取消息
if(readBuff.length > 2){
OnReceiveData();
}
}
读取出的缓冲区数据已经没有用了,需要删除它。一个直观的办法是将缓冲区后面的数据向前移位
移动缓冲区数据可使用Array.Copy方法,它的原型如下:
public static void Copy(
Array sourceArray,
long sourceIndex,
Array destinationArray,
long destinationIndex,
long length
)
sourceArray代表源数组,destinationArray代表目标数据,sourceIndex代表源数组的起始位置,destinationIndex代表目标数组的起始位置,length代表要复制的消息的长度。
public void OnReceiveData(){
//处理一条消息(略)
//更新缓冲区
int start = 2 + bodyLength;
int count = buffCount - start;
Array.Copy(readBuff, start, readBuff, 0, count);
buffCount -= start;
//如果有更多消息,就处理它
}
完整示例
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using System.Net.Sockets;
using UnityEngine.UI;
using System;
using System.Linq;
public class Echo : MonoBehaviour {
//定义套接字
Socket socket;
//UGUI
public InputField InputFeld;
public Text text;
//接收缓冲区
byte[] readBuff = new byte[1024];
//接收缓冲区的数据长度
int buffCount = 0;
//显示文字
string recvStr = "";
//点击连接按钮
public void Connection()
{
//Socket
socket = new Socket(AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
//为了精简代码:使用同步Connect
//不考虑抛出异常
socket.Connect("127.0.0.1", 8888);
socket.BeginReceive( readBuff, buffCount, 1024-buffCount, 0,
ReceiveCallback, socket);
}
//Receive回调
public void ReceiveCallback(IAsyncResult ar){
try {
Socket socket = (Socket) ar.AsyncState;
//获取接收数据长度
int count = socket.EndReceive(ar);
buffCount+=count;
//处理二进制消息
OnReceiveData();
//继续接收数据
socket.BeginReceive( readBuff, buffCount, 1024-buffCount, 0,
ReceiveCallback, socket);
}
catch (SocketException ex){
Debug.Log("Socket Receive fail" + ex.ToString());
}
}
public void OnReceiveData(){
Debug.Log("[Recv 1] buffCount=" +buffCount);
Debug.Log("[Recv 2] readbuff=" + BitConverter.ToString(readBuff));
//消息长度
if(buffCount <= 2)
return;
Int16 bodyLength = BitConverter.ToInt16(readBuff, 0);
Debug.Log("[Recv 3] bodyLength=" +bodyLength);
//消息体
if(buffCount < 2+bodyLength)
return;
string s = System.Text.Encoding.UTF8.GetString(readBuff, 2, buffCount);
Debug.Log("[Recv 4] s=" +s);
//更新缓冲区
int start = 2 + bodyLength;
int count = buffCount - start;
Array.Copy(readBuff, start, readBuff, 0, count);
buffCount -= start;
Debug.Log("[Recv 5] buffCount=" +buffCount);
//消息处理
recvStr = s + "\n" + recvStr;
//继续读取消息
OnReceiveData();
}
//点击发送按钮
public void Send()
{
string sendStr = InputFeld.text;
//组装协议
byte[] bodyBytes = System.Text.Encoding.Default.GetBytes(sendStr);
Int16 len = (Int16)bodyBytes.Length;
byte[] lenBytes = BitConverter.GetBytes(len);
byte[] sendBytes = lenBytes.Concat(bodyBytes).ToArray();
//为了精简代码:使用同步Send
//不考虑抛出异常
socket.Send(sendBytes);
Debug.Log("[Send]" + BitConverter.ToString(sendBytes));
}
public void Update(){
text.text = recvStr;
}
}
- 使用buffCount记录缓冲区的数据长度,使缓冲区可以保存多条数据;
- 接收数据(BeginReceive)的起点改为buffCount,由于缓冲区总长度为1024,所以最大能接收的数据长度变成了1024-buffCount;
- 通过OnReceiveData处理消息
- 给发送的消息添加长度信息。
大端小端问题
下面是经过简化的BitConverter.ToInt16源码,其中的IsLittleEndian代表这台计算机是大端编码还是小端编码,不同的计算机编码方式会有不同。
public static short ToInt16(byte[] value, int startIndex) {
if( startIndex % 2 == 0) { // data is aligned
return *((short *) pbyte);
}
else {
if( IsLittleEndian) {
return (short)((*pbyte) | (*(pbyte + 1) << 8)) ;
}
else {
return (short)((*pbyte << 8) | (*(pbyte + 1)));
}
}
完整发送数据
如何解决发送不完整问题
要让数据能够发送完整,需要在发送前将数据保存起来;如果发送不完整,在Send回调函数中继续发送数据,示意代码如下。
//定义发送缓冲区
byte[] sendBytes = new byte[1024];
//缓冲区偏移值
int readIdx = 0;
//缓冲区剩余长度
int length = 0;
//点击发送按钮
public void Send()
{
sendBytes = 要发送的数据;
length = sendBytes.Length; //数据长度
readIdx = 0;
socket.BeginSend(sendBytes, 0, length, 0, SendCallback, socket);
}
//Send回调
public void SendCallback(IAsyncResult ar){
//获取state
Socket socket = (Socket) ar.AsyncState;
//EndSend的处理
int count = socket.EndSend(ar);
readIdx + =count;
length -= count;
//继续发送
if(length > 0){
socket.BeginSend(sendBytes,
readIdx, length, 0, SendCallback, socket);
}
}
socket.BeginSend(sendBytes, //发送缓冲区
readIdx, //从索引为6的数据开始发送
length, //因为缓冲区只剩下4个数据,最多发送4个数据
0, //标志位,设置为0即可
SendCallback, //回调函数
socket); //传给回调函数的对象
上面的方案解决了一半问题,因为调用BeginSend之后,可能要隔一段时间才会调用回调函数,如果玩家在SendCallback被调用之前再次点击发送按钮,按照前面的写法,会重置readIdx和length, SendCallback也就不可能正确工作了。为此我们设计了加强版的发送缓冲区,叫作写入队列(writeQueue),它的结构如图
图展示了一个包含三个缓冲区的写入队列,当玩家点击发送按钮时,数据会被写入队列的末尾,比如一开始发送“08hellolpy”,那么就在队列里添加一个缓冲区,这个缓冲区和本节前面介绍的缓冲区一样,包含一个bytes数组,以及指向缓冲区开始位置的readIdx、缓冲区剩余长度的length。Send方法会做这样的处理,示意代码如下:
public void Send() {
sendBytes = 要发送的数据;
writeQueue.Enqueue(ba); //假设ba封装了readbuff、readIdx、length等数据
if(writeQueue只有一条数据){
socket.BeginSend(参数略);
}
}
public void SendCallback(IAsyncResult ar){
count = socket.EndSend(ar);
ByteArray ba = writeQueue.First(); //ByteArray后面再介绍
ba.readIdx+=count; //length的处理略
if(发送不完整){
取出第一条数据,再次发送
}
else if(发送完整,且writeQueue还有数据){
删除第一条数据
取出第二条数据,如有,发送
}
}
ByteArray 和 Queue
ByteArray是封装byte[]、readIdx和length的类,可以这样定义它(添加文件ByteArray.cs):
using System;
public class ByteArray {
//缓冲区
public byte[] bytes;
//读写位置
public int readIdx = 0;
public int writeIdx = 0;
//数据长度
public int length { get { return writeIdx-readIdx; }}
//构造函数
public ByteArray(byte[] defaultBytes){
bytes = defaultBytes;
readIdx = 0;
writeIdx = defaultBytes.Length;
}
}
byte[] sendBytes = new byte[]{'0', '3', 'c', 'a', 't'};
ByteArray ba = new ByteArray(sendBytes);
socket.BeginSend(ba.bytes, ba.readIdx, ba.length, 0, SendCallback, socket);
Queue
Queue<ByteArray> writeQueue = new Queue<ByteArray>();
ByteArray ba = new ByteArray(sendBytes);
writeQueue.Enqueue(ba); //将ba放入队列
ByteArray ba2 = writeQueue.First(); //获取writeQueue的第一个元素,队列保持不变
be2 = writeQueue.Dequeue(); //弹出队列的第一个元素
Enqueue代表把元素放入到队列中,该元素会放到队列的末尾;Dequeue代表出列,队列的第一个元素被弹出来;First代表获取队列的第一个元素
解决线程冲突
由异步的机制可以知道,BeginSend和回调函数往往执行于不同的线程,如果多个线程同时操作writeQueue,有可能引发些问题。
玩家连续点击两次发送按钮,假如运气特别差,第二次发送时,第一次发送的回调函数刚好被调用。如果线程1的Send刚好走到writeQueue.Enqueue(ba)这一行(t2时刻),按理说writeQueue.Count应为2,不应该进入if(writeQueue.Count == 1)的真分支去发送数据(因为此时writeQueue.Count== 2)。但假如在条件判断之前,回调线程刚好执行了writeQueue.Dequeue()(t3时刻),由于writeQueue里只有1个元素,在t4时刻主线程判断if(writeQueue.Count == 1)时,条件成立,会发送数据。但SendCallback中ba = writeQueue.First()也会获取到队列的第一条数据,也会把它发送出去。第二次发送的数据将会被发送两次,显然不是我们需要的。
为了避免线程竞争,可以通过加锁(lock)的方式处理。当两个线程争夺一个锁的时候,一个线程等待,被阻止的那个锁变为可用
//发送缓冲区
Queue<ByteArray> writeQueue = new Queue<ByteArray>();
//点击发送按钮
public void Send()
{
//拼接字节,省略组装sendBytes的代码
byte[] sendBytes = 要发送的数据;
ByteArray ba = new ByteArray(sendBytes);
int count = 0;
lock(writeQueue){
writeQueue.Enqueue(ba);
count = writeQueue.Count;
}
//send
if(count == 1){
socket.BeginSend(sendBytes, 0, sendBytes.Length,
0, SendCallback, socket);
}
Debug.Log("[Send]" + BitConverter.ToString(sendBytes));
}
//Send回调
public void SendCallback(IAsyncResult ar){
//获取state、EndSend的处理
Socket socket = (Socket) ar.AsyncState;
int count = socket.EndSend(ar);
ByteArray ba;
lock(writeQueue){
ba = writeQueue.First();
}
ba.readIdx+=count;
if(count == ba.length){
lock(writeQueue){
writeQueue.Dequeue();
ba = writeQueue.First();
}
}
if(ba ! = null){
socket.BeginSend(ba.bytes, ba.readIdx, ba.length,
0, SendCallback, socket);
}
}
以上代码把临界区设计得很小,拥有较高的执行效率。
参考书籍:《Unity3D网络游戏实战(第2版)》 (豆瓣) (douban.com)