从0到1开发go-tcp框架【2-实现Message模块、解决TCP粘包问题、实现多路由机制】
1 实现\封装Message模块
zinx/ziface/imessage.go
package ziface
type IMessage interface {
GetMsdId() uint32
GetMsgLen() uint32
GetMsgData() []byte
SetMsgId(uint32)
SetData([]byte)
SetDataLen(uint32)
}
zinx/znet/message.go
package znet
type Message struct {
//消息id
Id uint32
//消息长度
DataLen uint32
//消息内容
Data []byte
}
func (m *Message) GetMsdId() uint32 {
return m.Id
}
func (m *Message) GetMsgLen() uint32 {
return m.DataLen
}
func (m *Message) GetMsgData() []byte {
return m.Data
}
func (m *Message) SetMsgId(id uint32) {
m.Id = id
}
func (m *Message) SetData(data []byte) {
m.Data = data
}
func (m *Message) SetDataLen(len uint32) {
m.DataLen = len
}
2 解决TCP粘包问题(TLV方式)
2.1 解决思路
大家都知道TCP是一种流式传输(所谓流式,也就是没有截止,因此会出现粘包的问题,因为我们不知道读多少数据结束一个包)
解决思路:TLV:type、length、value
- 每个数据包都封装上TLV,告诉对方我们消息的类型,我们消息的长度(设定为占固定长度,如8字节)。
- 这样对方在接受的时候,每次先读8字节,拿到类型和长度,最后再根据类型和长度读取对应数量的数据
2.2 封包拆包过程实现
①zinx/ziface/idatapack.go
package ziface
type IDataPack interface {
//获取包头的长度
GetHeadLen() uint32
//封包方法1
Pack(msg IMessage) ([]byte, error)
//拆包
UnPack([]byte) (IMessage, error)
}
②zinx/znet/datapack.go
实现封包,拆包方法
- 写入数据头
package znet
import (
"bytes"
"encoding/binary"
"github.com/kataras/iris/v12/x/errors"
"myTest/zinx/util"
"myTest/zinx/ziface"
)
type DataPack struct {
}
func NewDataPack() *DataPack {
return &DataPack{}
}
//获取包头的长度
func (dp *DataPack) GetHeadLen() uint32 {
//DataLen uint32 4字节 + ID uint32 4字节,固定包头的长度
return 8
}
//封包方法
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {
//创建一个存放bytes字节的缓冲
dataBuf := bytes.NewBuffer([]byte{})
//包的格式【包长度、包Id、包数据】
//1 先写dataLen写入dataBuf中,采用小端写
if err := binary.Write(dataBuf, binary.LittleEndian, msg.GetMsgLen()); err != nil {
return nil, err
}
//2 写入msgId
if err := binary.Write(dataBuf, binary.LittleEndian, msg.GetMsdId()); err != nil {
return nil, err
}
//3 写入具体数据
if err := binary.Write(dataBuf, binary.LittleEndian, msg.GetMsgData()); err != nil {
return nil, err
}
return dataBuf.Bytes(), nil
}
//拆包:将包的head信息都提取出来(包的id、长度),然后再根据包的长度一次性读取数据
func (dp *DataPack) UnPack(binaryData []byte) (ziface.IMessage, error) {
dataBuf := bytes.NewReader(binaryData)
//先解压head信息,得到dataLen和msgId
msg := &Message{}
//dataLen
if err := binary.Read(dataBuf, binary.LittleEndian, &msg.DataLen); err != nil {
return nil, err
}
//msgId
if err := binary.Read(dataBuf, binary.LittleEndian, &msg.Id); err != nil {
return nil, err
}
//判断dataLen是否已经超过了我们在zinx.json配置文件中所允许的包最大长度
if util.GlobalObject.MaxPackageSize > 0 && msg.DataLen > util.GlobalObject.MaxPackageSize {
return nil, errors.New("too large msg data receive")
}
//msg中只包含:dataLen和dataId
return msg, nil
}
③测试:zinx/znet/datapack_test.go
在测试的时候可以先把util/globalobj.go中
GlobalObject.Reload()
注释掉,因为我们是通过go自带的test框架测试,所以会读取不到配置文件
zinx/znet/datapack_test.go
注意:go的test文件名必须是xxxx_test.go
package znet
import (
"fmt"
"io"
"net"
"testing"
)
//测试dataPack的拆包、封包
func TestDataPack(t *testing.T) {
/*
1 模拟服务器
*/
listener, err := net.Listen("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("server listen err ", err)
return
}
//启动协程,用于处理客户端的业务
go func() {
//2 从客户端读取数据,进行拆包
conn, err := listener.Accept()
if err != nil {
fmt.Println("server accept err ", err)
return
}
go func(conn net.Conn) {
//处理客户端的请求
//>-----拆包过程------<
dp := NewDataPack()
for {
// ①第一次从conn中读,将包中的head读取出来[我们定义的headLen默认是8字节]
headData := make([]byte, dp.GetHeadLen())
_, err := io.ReadFull(conn, headData)
if err != nil {
fmt.Println("read head err ", err)
return
}
//解析headData
msgHead, err := dp.UnPack(headData)
if err != nil {
fmt.Println("server unpack err ", err)
return
}
if msgHead.GetMsgLen() > 0 {
//msg中是有数据的,需要进行第二次读取
//②第二次读取,是根据head中的dataLen来读取data内容
msg := msgHead.(*Message)
//根据数据包中的数据长度创建对应的切片
msg.Data = make([]byte, msg.GetMsgLen())
_, err := io.ReadFull(conn, msg.Data)
if err != nil {
fmt.Println("server unpack err ", err)
return
}
//完整的一个消息已经读取完毕
fmt.Println("----->Receive MsgID:", msg.Id, "dataLen=", msg.DataLen, ",/data=", string(msg.Data))
}
}
}(conn)
}()
/*
模拟客户端发送数据包
*/
conn, err := net.Dial("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("client dial err ", err)
return
}
//创建一个封包对象
dp := NewDataPack()
//模拟粘包过程,封装两个msg一同发送
msg1 := &Message{
Id: 1,
DataLen: 4,
Data: []byte{'z', 'i', 'n', 'x'},
}
msg2 := &Message{
Id: 2,
DataLen: 8,
Data: []byte{'h', 'e', 'l', 'l', 'o', ' ', 'y', 'a'},
}
//将两个数据包粘在一起[将数据进行打包],打包最后的结果还是一个[]byte切片
sendData1, err := dp.Pack(msg1)
if err != nil {
fmt.Println("Client pack msg1 err ", err)
return
}
sendData2, err := dp.Pack(msg2)
if err != nil {
fmt.Println("Client pack msg2 err ", err)
return
}
//需要使用sendData2,将数据打散,否则会成为切片中嵌套切片
sendData1 = append(sendData1, sendData2...)
//一次性将全部数据发送给服务端
conn.Write(sendData1)
//阻塞,查看控制台打印结果是否正确
select {}
}
2.3 zinx框架集成消息封装机制
将消息封装机制集成到我们自定义的zinx框架中
- 将zinx/znet/connection.go中的StartReader方法使用封装后的消息实现
- 将zinx/znet/request.go中的data改为IMessage
- 在zinx/znet/message.go中添加一个NewMessage的方法
- 在zinx/znet/connection.go中新增SendMsg方法
①zinx/ziface/iconnection.go
package ziface
import "net"
type IConnection interface {
//启动连接
Start()
//停止连接
Stop()
//获取当前连接的Conn对象
GetTCPConnection() *net.TCPConn
//获取当前连接模块的id
GetConnectionID() uint32
//获取远程客户端的TCP状态 IP:Port
RemoteAddr() net.Addr
//发送数据
SendMsg(msgId uint32, data []byte) error
}
//定义一个处理连接业务的方法
type HandleFunc func(*net.TCPConn, []byte, int) error
②zinx/znet/connection.go
package znet
import (
"fmt"
"github.com/kataras/iris/v12/x/errors"
"io"
"myTest/zinx/ziface"
"net"
)
type Connection struct {
Conn *net.TCPConn
ConnID uint32
isClosed bool
//告知当前的连接已经退出
ExitChan chan bool
Router ziface.IRouter
}
func NewConnection(conn *net.TCPConn, connID uint32, router ziface.IRouter) *Connection {
c := &Connection{
Conn: conn,
ConnID: connID,
Router: router,
isClosed: false,
ExitChan: make(chan bool, 1),
}
return c
}
func (c *Connection) StartReader() {
fmt.Println("reader goroutine is running...")
defer fmt.Println("connID=", c.ConnID, "Reader is exit, remote addr is ", c.RemoteAddr().String())
defer c.Stop()
//读取数据
for {
//buf := make([]byte, util.GlobalObject.MaxPackageSize)
//_, err := c.Conn.Read(buf)
//if err != nil {
// fmt.Printf("connID %d receive buf err %s\n", c.ConnID, err)
// continue
//}
//创建一个拆包对象
dp := NewDataPack()
//读取客户端的msg Head 二进制流 8字节
headData := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
fmt.Println("read msg head err ", err)
break
}
//拆包,将读取到的headData封装为msg
msg, err := dp.UnPack(headData)
if err != nil {
fmt.Println("unpack msg err ", err)
break
}
//根据dataLen,再次读取Data,放在msg.Data中,
var data []byte
//如果数据包中有数据,则读取
if msg.GetMsgLen() > 0 {
data = make([]byte, msg.GetMsgLen())
//将切片data读满
if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
fmt.Println("read msg data err ", err)
break
}
}
msg.SetData(data)
//封装请求,改为router处理
r := Request{
conn: c.Conn,
msg: msg,
}
go func(request ziface.IRequest) {
c.Router.PreHandle(request)
c.Router.Handler(request)
c.Router.PostHandler(request)
}(&r)
}
}
//启动连接
func (c *Connection) Start() {
fmt.Printf("ConnID %d is Start...", c.ConnID)
go c.StartReader()
}
//停止连接
func (c *Connection) Stop() {
fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)
if c.isClosed {
return
}
c.isClosed = true
c.Conn.Close()
close(c.ExitChan)
}
//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {
return c.Conn
}
//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {
return c.ConnID
}
//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.isClosed {
return errors.New("connection closed\n")
}
//将data进行封包
dp := NewDataPack()
binaryMsg, err := dp.Pack(NewMessage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id=", msgId)
return errors.New("pack error msg")
}
//将数据发送给客户端
if _, err := c.Conn.Write(binaryMsg); err != nil {
fmt.Println("write msg id ", msgId, " error ", err)
return errors.New("conn write err ")
}
return nil
}
2.4 zinx测试集成消息封装机制
注意:之前irequest.go和request.go代码有误
,修改为以下即可
- 修改部分主要为:将GetConnection更换为我们自定义的connection
/zinx/ziface/irequest.go:
package ziface
type IRequest interface {
GetConnection() IConnection
GetData() []byte
GetMsgID() uint32
}
/zinx/znet/request.go:
package znet
import (
"myTest/zinx/ziface"
)
type Request struct {
conn ziface.IConnection
msg ziface.IMessage
}
func (r *Request) GetConnection() ziface.IConnection {
return r.conn
}
func (r *Request) GetData() []byte {
return r.msg.GetMsgData()
}
func (r *Request) GetMsgID() uint32 {
return r.msg.GetMsdId()
}
①client.go
package main
import (
"fmt"
"io"
"myTest/zinx/znet"
"net"
"time"
)
/*
模拟客户端
*/
func main() {
fmt.Println("client start...")
time.Sleep(time.Second * 1)
//1 创建服务器连接
conn, err := net.Dial("tcp", "127.0.0.1:8092")
if err != nil {
fmt.Println("client start err ", err)
return
}
for {
//发送封装后的数据包
dp := znet.NewDataPack()
binaryMsg, err := dp.Pack(znet.NewMessage(0, []byte("Zinx v0.5 client test msg")))
if err != nil {
fmt.Println("client pack msg err ", err)
return
}
if _, err := conn.Write(binaryMsg); err != nil {
fmt.Println("client write err ", err)
return
}
//服务器应该给我们回复一个message数据,msgId为1,内容为ping...ping...
//1 先读取流中的head部分,得到Id和dataLen
binaryHead := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(conn, binaryHead); err != nil {
fmt.Println("client read head err ", err)
break
}
//将二进制的head拆包到msg中
msgHead, err := dp.UnPack(binaryHead)
if err != nil {
fmt.Println("client unpack msgHead err ", err)
break
}
if msgHead.GetMsgLen() > 0 {
//2 有数据, 再根据dataLen进行二次读取,将data读出来
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetMsgLen())
if _, err := io.ReadFull(conn, msg.Data); err != nil {
fmt.Println("read msg data error ", err)
return
}
fmt.Println("--------> Receive Server msg , ID=", msg.Id, " ,len=", msg.DataLen, " ,data=", string(msg.Data))
}
//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片
time.Sleep(time.Second * 1)
}
}
②server.go
package main
import (
"fmt"
"myTest/zinx/ziface"
"myTest/zinx/znet"
)
//自定义一个Router,测试路由功能
type PingRouter struct {
znet.BaseRouter
}
func (pr *PingRouter) Handler(request ziface.IRequest) {
fmt.Println("call router handler...")
//先读取客户端数据,再回写ping...ping...ping...
fmt.Println("receive from client msgId=", request.GetMsgID(),
"data=", string(request.GetData()))
//回写ping
err := request.GetConnection().SendMsg(1, []byte("ping...ping...ping..."))
if err != nil {
fmt.Println(err)
}
}
func main() {
s := znet.NewServer("[Zinx v5.0]")
//添加自定义路由
router := &PingRouter{}
s.AddRouter(router)
s.Serve()
}
测试结果:
2.5 消息管理模块(支持多路由)MsgHandler
①zinx/ziface/imsgHandler.go
package ziface
type IMsgHandler interface {
DoMsgHandler(request IRequest)
AddRouter(msgId uint32, router IRouter)
}
②zinx/znet/msgHandler.go
package znet
import (
"fmt"
"myTest/zinx/ziface"
"strconv"
)
type MsgHandle struct {
//msgId与对应的router对应
Api map[uint32]ziface.IRouter
}
func NewMsgHandle() *MsgHandle {
return &MsgHandle{
Api: make(map[uint32]ziface.IRouter),
}
}
func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {
//判断是否有对应的router
if _, ok := mh.Api[request.GetMsgID()]; !ok {
fmt.Println("msgId ", request.GetMsgID(), "does not exist handler, need to add router")
return
}
//call handler
router := mh.Api[request.GetMsgID()]
router.PreHandle(request)
router.Handler(request)
router.PostHandler(request)
}
func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {
if _, ok := mh.Api[msgId]; ok {
//如果已经存在了对应的router,则提示
panic("repeat api, msgId = " + strconv.Itoa(int(msgId)))
}
mh.Api[msgId] = router
fmt.Println("msgId ", msgId, "Add router success ")
}
2.6 消息管理模块集成到Zinx框架中[V0.6]
- 将server模块中的Router属性替换为MsgHandler
- 将server之前的AddRouter修改为调用MsgHandler的AddRouter
- 将connection模块中的Router属性修改为MsgHandler
- Connection中之前调度Router的业务替换为MsgHandler调度
①zinx/znet/server.go
package znet
import (
"fmt"
"myTest/zinx/util"
"myTest/zinx/ziface"
"net"
)
type Server struct {
Name string
IPVersion string
IP string
Port int
MsgHandler *MsgHandle
}
func NewServer(name string) *Server {
s := &Server{
Name: name,
IPVersion: "tcp4",
IP: util.GlobalObject.Host,
Port: util.GlobalObject.TcpPort,
MsgHandler: NewMsgHandle(),
}
return s
}
func (s *Server) Start() {
//启动服务监听端口
fmt.Printf("[Zinx] Server Name :%s , listen IP :%v , Port: %d is starting \n", s.Name, s.IP, s.Port)
fmt.Printf("[Zinx] Version :%s , MaxConn:%v , MaxPackageSize: %d \n", util.GlobalObject.Version, util.GlobalObject.MaxConn, util.GlobalObject.MaxPackageSize)
go func() {
addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
if err != nil {
fmt.Printf("resolve tcp addr error %v\n", err)
return
}
listener, err := net.ListenTCP(s.IPVersion, addr)
if err != nil {
fmt.Println("listen ", s.IPVersion, " err ", err)
return
}
fmt.Println("[start] Zinx server success ", s.Name, "Listening...")
//阻塞连接,处理业务
for {
conn, err := listener.AcceptTCP()
if err != nil {
fmt.Println("Accept err ", err)
continue
}
var cid uint32 = 0
dealConn := NewConnection(conn, cid, s.MsgHandler)
cid++
//开启goroutine处理启动当前conn
go dealConn.Start()
}
}()
}
func (s *Server) Stop() {
}
func (s *Server) Serve() {
s.Start()
//阻塞,一直读取客户端所发送过来的消息
select {}
}
func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {
s.MsgHandler.AddRouter(msgId, router)
}
②zinx/znet/connection.go
package znet
import (
"fmt"
"github.com/kataras/iris/v12/x/errors"
"io"
"net"
)
type Connection struct {
Conn *net.TCPConn
ConnID uint32
isClosed bool
//告知当前的连接已经退出
ExitChan chan bool
MsgHandler *MsgHandle
}
func NewConnection(conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {
c := &Connection{
Conn: conn,
ConnID: connID,
MsgHandler: msgHandle,
isClosed: false,
ExitChan: make(chan bool, 1),
}
return c
}
func (c *Connection) StartReader() {
fmt.Println("reader goroutine is running...")
defer fmt.Println("connID=", c.ConnID, "Reader is exit, remote addr is ", c.RemoteAddr().String())
defer c.Stop()
//读取数据
for {
//创建一个拆包对象
dp := NewDataPack()
//读取客户端的msg Head 二进制流 8字节
headData := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
fmt.Println("read msg head err ", err)
break
}
//拆包,将读取到的headData封装为msg
msg, err := dp.UnPack(headData)
if err != nil {
fmt.Println("unpack msg err ", err)
break
}
//根据dataLen,再次读取Data,放在msg.Data中,
var data []byte
//如果数据包中有数据,则读取
if msg.GetMsgLen() > 0 {
data = make([]byte, msg.GetMsgLen())
//将切片data读满
if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
fmt.Println("read msg data err ", err)
break
}
}
msg.SetData(data)
//封装请求,改为router处理
r := Request{
conn: c,
msg: msg,
}
go c.MsgHandler.DoMsgHandler(&r)
}
}
//启动连接
func (c *Connection) Start() {
fmt.Printf("ConnID %d is Start...", c.ConnID)
go c.StartReader()
}
//停止连接
func (c *Connection) Stop() {
fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)
if c.isClosed {
return
}
c.isClosed = true
c.Conn.Close()
close(c.ExitChan)
}
//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {
return c.Conn
}
//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {
return c.ConnID
}
//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.isClosed {
return errors.New("connection closed\n")
}
//将data进行封包
dp := NewDataPack()
binaryMsg, err := dp.Pack(NewMessage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id=", msgId)
return errors.New("pack error msg")
}
//将数据发送给客户端
if _, err := c.Conn.Write(binaryMsg); err != nil {
fmt.Println("write msg id ", msgId, " error ", err)
return errors.New("conn write err ")
}
return nil
}
③测试
myDemo/ZinxV0.6/Client0.go
第一个客户端
package main
import (
"fmt"
"io"
"myTest/zinx/znet"
"net"
"time"
)
/*
模拟客户端
*/
func main() {
fmt.Println("client start...")
time.Sleep(time.Second * 1)
//1 创建服务器连接
conn, err := net.Dial("tcp", "127.0.0.1:8092")
if err != nil {
fmt.Println("client start err ", err)
return
}
for {
//发送封装后的数据包
dp := znet.NewDataPack()
binaryMsg, err := dp.Pack(znet.NewMessage(0, []byte("Zinx client0 test msg")))
if err != nil {
fmt.Println("client pack msg err ", err)
return
}
if _, err := conn.Write(binaryMsg); err != nil {
fmt.Println("client write err ", err)
return
}
//服务器应该给我们回复一个message数据,msgId为1,内容为ping...ping...
//1 先读取流中的head部分,得到Id和dataLen
binaryHead := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(conn, binaryHead); err != nil {
fmt.Println("client read head err ", err)
break
}
//将二进制的head拆包到msg中
msgHead, err := dp.UnPack(binaryHead)
if err != nil {
fmt.Println("client unpack msgHead err ", err)
break
}
if msgHead.GetMsgLen() > 0 {
//2 有数据, 再根据dataLen进行二次读取,将data读出来
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetMsgLen())
if _, err := io.ReadFull(conn, msg.Data); err != nil {
fmt.Println("read msg data error ", err)
return
}
fmt.Println("--------> Receive Server msg , ID=", msg.Id, " ,len=", msg.DataLen, " ,data=", string(msg.Data))
}
//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片
time.Sleep(time.Second * 1)
}
}
myDemo/ZinxV0.6/Client1.go
package main
import (
"fmt"
"io"
"myTest/zinx/znet"
"net"
"time"
)
/*
模拟客户端
*/
func main() {
fmt.Println("client start...")
time.Sleep(time.Second * 1)
//1 创建服务器连接
conn, err := net.Dial("tcp", "127.0.0.1:8092")
if err != nil {
fmt.Println("client start err ", err)
return
}
for {
//发送封装后的数据包
dp := znet.NewDataPack()
binaryMsg, err := dp.Pack(znet.NewMessage(1, []byte("Zinx client1 test msg")))
if err != nil {
fmt.Println("client pack msg err ", err)
return
}
if _, err := conn.Write(binaryMsg); err != nil {
fmt.Println("client write err ", err)
return
}
//服务器应该给我们回复一个message数据,msgId为1,内容为ping...ping...
//1 先读取流中的head部分,得到Id和dataLen
binaryHead := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(conn, binaryHead); err != nil {
fmt.Println("client read head err ", err)
break
}
//将二进制的head拆包到msg中
msgHead, err := dp.UnPack(binaryHead)
if err != nil {
fmt.Println("client unpack msgHead err ", err)
break
}
if msgHead.GetMsgLen() > 0 {
//2 有数据, 再根据dataLen进行二次读取,将data读出来
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetMsgLen())
if _, err := io.ReadFull(conn, msg.Data); err != nil {
fmt.Println("read msg data error ", err)
return
}
fmt.Println("--------> Receive Server msg , ID=", msg.Id, " ,len=", msg.DataLen, " ,data=", string(msg.Data))
}
//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片
time.Sleep(time.Second * 1)
}
}
myDemo/ZinxV0.6/Server.go
package main
import (
"fmt"
"myTest/zinx/ziface"
"myTest/zinx/znet"
)
//自定义一个Router,测试路由功能
type PingRouter struct {
znet.BaseRouter
}
func (pr *PingRouter) Handler(request ziface.IRequest) {
fmt.Println("call router handler...")
//先读取客户端数据,再回写ping...ping...ping...
fmt.Println("receive from client msgId=", request.GetMsgID(),
"data=", string(request.GetData()))
//回写ping
err := request.GetConnection().SendMsg(0, []byte("ping...ping...ping..."))
if err != nil {
fmt.Println(err)
}
}
//定义第二个Router
type HelloRouter struct {
znet.BaseRouter
}
func (hr *HelloRouter) Handler(request ziface.IRequest) {
fmt.Println("receive from client msgId=", request.GetMsgID(),
"data=", string(request.GetData()))
err := request.GetConnection().SendMsg(1, []byte("hello zinx, I'm the other handler"))
if err != nil {
fmt.Println(err)
}
}
func main() {
s := znet.NewServer("[Zinx v0.6]")
//添加自定义路由(PingRouter和HelloRouter)
router0 := &PingRouter{}
s.AddRouter(0, router0)
router1 := &HelloRouter{}
s.AddRouter(1, router1)
s.Serve()
}
测试结果:
Zinx正确接受了不同客户端的请求,并根据不同的请求做出了不同的处理
- 根据msgId和注册handler来对应处理不同请求