Zinx - V0.5 消息封装
- 之前我们使用Request来保存服务器的数据,很显然使用[]byte来接收数据,没有长度也没有消息类型,接下来就要针对这个消息进行封装
创建消息类型
定义一个基本的message包,会包含消息ID、数据、数据长度三个成员,并提供基本的setter和getter方法
imssage.go接口
package ziface
//将请求的消息封装到一个Message中, 定义抽象的接口
type IMessage interface {
//获取消息的ID
GetMsgId() uint32
//获取消息的长度
GetMsgLen() uint32
//获取消息的内容
GetData() []byte
//设置消息的ID
SetMsgId(uint32)
//设置消息的内容
SetData([]byte)
//设置消息的长度
SetDataLen(uint32)
}
message.go实现类
package znet
type Message struct {
Id uint32 //消息的ID
DataLen uint32 //消息的长度
Data []byte //消息的内容
}
//创建一个Message消息包
func NewMsgPackage(id uint32, data []byte) *Message {
return &Message{
Id: id,
DataLen: uint32(len(data)),
Data: data,
}
}
//获取消息的ID
func (m *Message) GetMsgId() uint32 {
return m.Id
}
//获取消息的长度
func (m *Message) GetMsgLen() uint32 {
return m.DataLen
}
//获取消息的内容
func (m *Message) GetData() []byte {
return m.Data
}
//设置消息的ID
func (m *Message) SetMsgId(id uint32) {
m.Id = id
}
//设置消息的内容
func (m *Message) SetData(data []byte) {
m.Data = data
}
//设置消息的长度
func (m *Message) SetDataLen(len uint32) {
m.DataLen = len
}
消息的粘包
这里我们使用 TLV (Type-Len-Value) 封包格式解决TCP粘包问题,由于Zinx也是TCP流的形式传播数据,难免会出现消息1和消息2⼀同发送,那么zinx就需要有能⼒区分两个消息的边界,所以Zinx此时应该提供⼀个统⼀的拆包和封包的⽅法。
- 封包:在发包之前打包成如上图这种格式的有head和body的两部分的包
- 拆包:在收到数据的时候分两次进行读取,先读取固定长度的head部分,得到后续Data的长度,再根据DataLen读取之后的body。
封包拆包的实现
需要注意的是,封包针对的是IMessage数据来封,返回的是二进制序列化后的结果,即把message结构体变成二进制序列化的数据,拆包针对的是二进制的数据流,得到的是IMessage数据,即把二进制序列化的数据变成message结构体
idatapack.go
package ziface
//封包、拆包 模块
//直接面向TCP连接中的数据流, 用于处理TCP粘包问题
type IDataPack interface {
//获取包的头的长度方法
GetHeadLen() uint32
//封包方法
Pack(msg IMessage) ([]byte, error)
//拆包方法
Unpack([]byte) (IMessage, error)
}
datapack.go
package znet
import (
"bytes"
"encoding/binary"
"errors"
"zinx/utils"
"zinx/ziface"
)
//封包,拆包的具体模块
type DataPack struct{}
//拆包封包实例的一个初始化方法
func NewDataPack() *DataPack {
return &DataPack{}
}
//获取包的头的长度方法
func (dp *DataPack) GetHeadLen() uint32 {
//Datalen uint32(4字节) + ID uint32(4字节)
return 8
}
//封包方法
//|datelen|msgID|data|
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {
//创建一个存放bytes字节的缓冲
dataBuff := bytes.NewBuffer([]byte{})
//将dataLen写进databuff中
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgLen()); err != nil {
return nil, err
}
//将MsgId 写进databuff中
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil {
return nil, err
}
//将data数据 写进databuff中
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil {
return nil, err
}
return dataBuff.Bytes(), nil
}
//拆包方法 (将包的Head信息都出来) 之后再根据head信息里的data的长度,再进行一次读
func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {
//创建一个从输入二进制数据的ioReader
dataBuff := bytes.NewReader(binaryData)
//只解压head信息,得到datalen和MsgID
msg := &Message{}
//读dataLen
if err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil {
return nil, err
}
//读MsgID
if err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil {
return nil, err
}
//判断datalen是否已经超出了我们允许的最大包长度
if utils.GlobalObject.MaxPackageSize > 0 && msg.DataLen > utils.GlobalObject.MaxPackageSize {
return nil, errors.New("too Large msg data recv!")
}
return msg, nil
}
注意这里的unpack()只读取了datalen和MsgID,并没有读取data
单元测试
- 单元测试思路
模拟服务器创建socketTCP,并使用go协程承载从客户端读取数据、进行拆包处理的业务
//只是负责测试datapack拆包 封包的单元测试
func TestDataPack(t *testing.T) {
//模拟的服务器
//1 创建socketTCP
listenner, err := net.Listen("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("server listen err: ", err)
return
}
//创建一个go 承载 负责从客户端处理业务
go func() {
//2 从客户端读取数据,拆包处理
for {
conn, err := listenner.Accept()
if err != nil {
fmt.Println("server accept error", err)
}
go func(conn net.Conn) {
//处理客户端的请求
//------> 拆包的过程 <------
//定义一个拆包的对象dp
dp := NewDataPack()
for {
// 1第一次从conn读, 把包的head读出来
headData := make([]byte, dp.GetHeadLen())
_, err := io.ReadFull(conn, headData)
if err != nil {
fmt.Println("read head error")
break
}
msgHead, err := dp.Unpack(headData)
if err != nil {
fmt.Println("server unpacke err ", err)
return
}
if msgHead.GetMsgLen() > 0 {
//msg是有数据的, 需要进行第二次读取
//2 第二次从conn读, 根据head中的datalen 再读取data内容
msg := msgHead.(*Message)
msg.Data = make([]byte, msg.GetMsgLen())
//根据datalen的长度再次从io流中读取
_, err := io.ReadFull(conn, msg.Data)
if err != nil {
fmt.Println("server unpack data err: ", err)
return
}
//完整的一个消息已经读取完毕
fmt.Println("---> Recv MsgID: ", msg.Id, ", datalen = ", msg.DataLen, "data = ", string(msg.Data))
}
}
}(conn)
}
}()
从客户端读取数据进行拆包处理的业务需要用for循环进行阻塞,等待客户端的连接,然后开启一个协程处理客户端请求并将conn作为形参传入,这个协程实际上是承载的拆包的业务,进行两次读取,第一次把包的head读出来,第二次根据head中的dataLen读取data内容。我们首先定义一个拆包对象,这个对象提供拆包方法,同时读包也是用一个for循环去读,第一次将包的head二进制流读出来后要将其封装到message中,即调用unpack()方法,返回一个只有dataLen和MsgID两个字段的结构体。此时就可以进行第二次读取了,我们想要将第二次读取的数据添加到结构体的data字段里,但需要注意的是此时的msgHead是imessage接口类型,没有data字段,所以我们需要进行类型断言将其转换为message结构体类型
模拟客户端启动go程序,将两个包组合发送模拟粘包
//模拟客户端
conn, err := net.Dial("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("client dial err: ", err)
return
}
//创建一个封包对象 dp
dp := NewDataPack()
//模拟粘包过程,封装两个msg一同发送
//封装第一个msg1包
msg1 := &Message{
Id: 1,
DataLen: 4,
Data: []byte{'z', 'i', 'n', 'x'},
}
sendData1, err := dp.Pack(msg1)
if err != nil {
fmt.Println("client pack msg1 error", err)
return
}
//封装第二个msg2包
msg2 := &Message{
Id: 2,
DataLen: 7,
Data: []byte{'n', 'i', 'h', 'a', 'o', '!', '!'},
}
sendData2, err := dp.Pack(msg2)
if err != nil {
fmt.Println("client pack msg1 error", err)
return
}
//将两个包粘在一起
sendData1 = append(sendData1, sendData2...)
//一次性发送给服务端
conn.Write(sendData1)
//客户端阻塞
select {}
创建一个封包对象,封装两个message一同发送,发送完之后主进程不能结束,要等待客户端返回,否则发送完进程结束客户端就销毁了,我们使用select阻塞
消息封装集成到Zinx框架
- 将Message添加到Request属性中
package ziface
//IReqeust接口:
//实际上是把客户端请求的链接信息, 和 请求的数据 包装到了一个Request中
type IRequest interface {
//得到当前链接
GetConnection() IConneciton
//得到请求的消息数据
GetData() []byte
//得到请求的消息ID
GetMsgID() uint32
}
package znet
import (
"zinx/ziface"
)
type Request struct {
//已经和客户端建立好的链接
conn ziface.IConneciton
//客户端请求的数据
msg ziface.IMessage
}
//得到当前链接
func (r *Request) GetConnection() ziface.IConneciton {
return r.conn
}
//得到请求的消息数据
func (r *Request) GetData() []byte {
return r.msg.GetData()
}
func (r *Request) GetMsgID() uint32 {
return r.msg.GetMsgId()
}
- 修改链接读取数据的机制,将之前的单纯的读取byte改成拆包形式的读取按照TLV形式读取
首先还是创建一个拆包解包对象,读取客户端的Msg Head二级制流8个字节,然后拆包,得到msgID和msgDatalen并放在msg消息中,最后根据dataLen再次读取Data,放在msg.Data中
//链接的读业务方法
func (c *Connection) StartReader() {
fmt.Println(" Reader Goroutine is running...")
defer fmt.Println("connID = ", c.ConnID, " Reader is exit, remote addr is ", c.RemoteAddr().String())
defer c.Stop()
for {
//读取客户端的数据到buf中
//buf := make([]byte, utils.GlobalObject.MaxPackageSize)
//_, err := c.Conn.Read(buf)
//if err != nil {
// fmt.Println("recv buf err", err)
// continue
//}
//创建一个拆包解包对象
dp := NewDataPack()
//读取客户端的Msg Head 二级制流 8个字节
headData := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
fmt.Println("read msg head error", err)
break
}
//拆包,得到msgID 和 msgDatalen 放在msg消息中
msg, err := dp.Unpack(headData)
if err != nil {
fmt.Println("unpack error", err)
break
}
//根据dataLen 再次读取Data, 放在msg.Data中
var data []byte
if msg.GetMsgLen() > 0 {
data = make([]byte, msg.GetMsgLen())
if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
fmt.Println("read msg data error ", err)
break
}
}
msg.SetData(data)
//得到当前conn数据的Request请求数据
req := Request{
conn: c,
msg: msg,
}
//执行注册的路由方法
go func(request ziface.IRequest) {
c.Router.PreHandle(request)
c.Router.Handle(request)
c.Router.PostHandle(request)
}(&req)
//从路由中,找到注册绑定的Conn对应的router调用
}
}
- 给链接提供一个发包机制: 将发送的消息进行打包,再发送
修改StartReader方法
//链接的读业务方法
func (c *Connection) StartReader() {
fmt.Println(" Reader Goroutine is running...")
defer fmt.Println("connID = ", c.ConnID, " Reader is exit, remote addr is ", c.RemoteAddr().String())
defer c.Stop()
for {
//读取客户端的数据到buf中
//buf := make([]byte, utils.GlobalObject.MaxPackageSize)
//_, err := c.Conn.Read(buf)
//if err != nil {
// fmt.Println("recv buf err", err)
// continue
//}
//创建一个拆包解包对象
dp := NewDataPack()
//读取客户端的Msg Head 二级制流 8个字节
headData := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
fmt.Println("read msg head error", err)
break
}
//拆包,得到msgID 和 msgDatalen 放在msg消息中
msg, err := dp.Unpack(headData)
if err != nil {
fmt.Println("unpack error", err)
break
}
//根据dataLen 再次读取Data, 放在msg.Data中
var data []byte
if msg.GetMsgLen() > 0 {
data = make([]byte, msg.GetMsgLen())
if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
fmt.Println("read msg data error ", err)
break
}
}
msg.SetData(data)
//得到当前conn数据的Request请求数据
req := Request{
conn: c,
msg: msg,
}
//执行注册的路由方法
go func(request ziface.IRequest) {
c.Router.PreHandle(request)
c.Router.Handle(request)
c.Router.PostHandle(request)
}(&req)
//从路由中,找到注册绑定的Conn对应的router调用
}
}
添加SendMsg方法
package ziface
import "net"
//定义链接模块的抽象层
type IConneciton interface {
//启动链接 让当前的链接准备开始工作
Start()
//停止链接 结束当前链接的工作
Stop()
//获取当前链接的绑定socket conn
GetTCPConnection() *net.TCPConn
//获取当前链接模块的链接ID
GetConnID() uint32
//获取远程客户端的 TCP状态 IP port
RemoteAddr() net.Addr
//发送数据, 将数据发送给远程的客户端
SendMsg(msgId uint32, data []byte) error
}
//定义一个处理链接业务的方法
type HandleFunc func(*net.TCPConn, []byte, int) error
实现SendMsg方法
//提供一个SendMsg方法 将我们要发送给客户端的数据,先进行封包,再发送
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.isClosed == true {
return errors.New("Connection closed when send msg")
}
//将data进行封包 MsgDataLen|MsgID|Data
dp := NewDataPack()
//MsgDataLen|MsgID|Data
binaryMsg, err := dp.Pack(NewMsgPackage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id = ", msgId)
return errors.New("Pack error msg")
}
//将数据发送给客户端
if _, err := c.Conn.Write(binaryMsg); err != nil {
fmt.Println("Write msg id ", msgId, " error :", err)
return errors.New("conn Write error")
}
return nil
}
Zinx框架开发
server.go
package main
import (
"fmt"
"zinx/ziface"
"zinx/znet"
)
//基于Zinx框架来开发的 服务器端应用程序
//ping test 自定义路由
type PingRouter struct {
znet.BaseRouter
}
//Test Handle
func (this *PingRouter) Handle(request ziface.IRequest) {
fmt.Println("Call Router Handle...")
//先读取客户端的数据,再回写ping..ping...ping
fmt.Println("recv from client: msgID = ", request.GetMsgID(),
", data = ", string(request.GetData()))
err := request.GetConnection().SendMsg(1, []byte("ping...ping...ping"))
if err != nil {
fmt.Println(err)
}
}
func main() {
//1 创建一个server句柄,使用Zinx的api
s := znet.NewServer("[zinx V0.5]")
//2 给当前zinx框架添加一个自定义的router
s.AddRouter(&PingRouter{})
//3 启动server
s.Serve()
}
client.go
package main
import (
"fmt"
"io"
"net"
"time"
"zinx/znet"
)
//模拟客户端
func main() {
fmt.Println("client start...")
time.Sleep(1 * time.Second)
//1 直接链接远程服务器,得到一个conn链接
conn, err := net.Dial("tcp", "127.0.0.1:8999")
if err != nil {
fmt.Println("client start err, exit!")
return
}
for {
//发送封包的message消息 MsgID:0
dp := znet.NewDataPack()
binaryMsg, err := dp.Pack(znet.NewMsgPackage(0, []byte("ZinxV0.5 client Test Message")))
if err != nil {
fmt.Println("Pack error:", err)
return
}
if _, err := conn.Write(binaryMsg); err != nil {
fmt.Println("write error", err)
return
}
//服务器就应该给我们回复一个message数据, MsgID:1 pingpingping
// 1 先读取流中的head部分 得到ID 和 dataLen
binaryHead := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(conn, binaryHead); err != nil {
fmt.Println("read head error ", err)
break
}
// 将二进制的head拆包到msg 结构体中
msgHead, err := dp.Unpack(binaryHead)
if err != nil {
fmt.Println("client unpack msgHead error ", err)
break
}
if msgHead.GetMsgLen() > 0 {
// 2再根据DataLen进行第二次读取,将data读出来
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetMsgLen())
if _, err := io.ReadFull(conn, msg.Data); err != nil {
fmt.Println("read msg data error , ", err)
return
}
fmt.Println("---> Recv Server Msg : ID = ", msg.Id, ", len = ", msg.DataLen, ", data = ", string(msg.Data))
}
//cpu阻塞
time.Sleep(1 * time.Second)
}
}