从0到1开发go-tcp框架【2-实现Message模块、解决TCP粘包问题、实现多路由机制】

news2025/1/10 22:19:24

从0到1开发go-tcp框架【2-实现Message模块、解决TCP粘包问题、实现多路由机制】

1 实现\封装Message模块

zinx/ziface/imessage.go

package ziface

type IMessage interface {
	GetMsdId() uint32
	GetMsgLen() uint32
	GetMsgData() []byte

	SetMsgId(uint32)
	SetData([]byte)
	SetDataLen(uint32)
}

zinx/znet/message.go

package znet

type Message struct {
	//消息id
	Id uint32
	//消息长度
	DataLen uint32
	//消息内容
	Data []byte
}

func (m *Message) GetMsdId() uint32 {
	return m.Id
}
func (m *Message) GetMsgLen() uint32 {
	return m.DataLen
}
func (m *Message) GetMsgData() []byte {
	return m.Data
}

func (m *Message) SetMsgId(id uint32) {
	m.Id = id
}
func (m *Message) SetData(data []byte) {
	m.Data = data
}
func (m *Message) SetDataLen(len uint32) {
	m.DataLen = len
}

2 解决TCP粘包问题(TLV方式)

2.1 解决思路

大家都知道TCP是一种流式传输(所谓流式,也就是没有截止,因此会出现粘包的问题,因为我们不知道读多少数据结束一个包)
解决思路:TLV:type、length、value

  • 每个数据包都封装上TLV,告诉对方我们消息的类型,我们消息的长度(设定为占固定长度,如8字节)。
  • 这样对方在接受的时候,每次先读8字节,拿到类型和长度,最后再根据类型和长度读取对应数量的数据

2.2 封包拆包过程实现

①zinx/ziface/idatapack.go

package ziface

type IDataPack interface {
	//获取包头的长度
	GetHeadLen() uint32
	//封包方法1
	Pack(msg IMessage) ([]byte, error)
	//拆包
	UnPack([]byte) (IMessage, error)
}

②zinx/znet/datapack.go

实现封包,拆包方法

  • 写入数据头
package znet

import (
	"bytes"
	"encoding/binary"
	"github.com/kataras/iris/v12/x/errors"
	"myTest/zinx/util"
	"myTest/zinx/ziface"
)

type DataPack struct {
}

func NewDataPack() *DataPack {
	return &DataPack{}
}

//获取包头的长度
func (dp *DataPack) GetHeadLen() uint32 {
	//DataLen uint32 4字节 + ID uint32 4字节,固定包头的长度
	return 8
}

//封包方法
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {
	//创建一个存放bytes字节的缓冲
	dataBuf := bytes.NewBuffer([]byte{})
	//包的格式【包长度、包Id、包数据】
	//1 先写dataLen写入dataBuf中,采用小端写
	if err := binary.Write(dataBuf, binary.LittleEndian, msg.GetMsgLen()); err != nil {
		return nil, err
	}
	//2 写入msgId
	if err := binary.Write(dataBuf, binary.LittleEndian, msg.GetMsdId()); err != nil {
		return nil, err
	}
	//3 写入具体数据
	if err := binary.Write(dataBuf, binary.LittleEndian, msg.GetMsgData()); err != nil {
		return nil, err
	}
	return dataBuf.Bytes(), nil
}

//拆包:将包的head信息都提取出来(包的id、长度),然后再根据包的长度一次性读取数据
func (dp *DataPack) UnPack(binaryData []byte) (ziface.IMessage, error) {
	dataBuf := bytes.NewReader(binaryData)
	//先解压head信息,得到dataLen和msgId
	msg := &Message{}
	//dataLen
	if err := binary.Read(dataBuf, binary.LittleEndian, &msg.DataLen); err != nil {
		return nil, err
	}
	//msgId
	if err := binary.Read(dataBuf, binary.LittleEndian, &msg.Id); err != nil {
		return nil, err
	}
	//判断dataLen是否已经超过了我们在zinx.json配置文件中所允许的包最大长度
	if util.GlobalObject.MaxPackageSize > 0 && msg.DataLen > util.GlobalObject.MaxPackageSize {
		return nil, errors.New("too large msg data receive")
	}
	//msg中只包含:dataLen和dataId
	return msg, nil
}

③测试:zinx/znet/datapack_test.go

在测试的时候可以先把util/globalobj.go中GlobalObject.Reload()注释掉,因为我们是通过go自带的test框架测试,所以会读取不到配置文件

zinx/znet/datapack_test.go
注意:go的test文件名必须是xxxx_test.go

package znet

import (
	"fmt"
	"io"
	"net"
	"testing"
)

//测试dataPack的拆包、封包
func TestDataPack(t *testing.T) {
	/*
		1 模拟服务器
	*/
	listener, err := net.Listen("tcp", "127.0.0.1:7777")
	if err != nil {
		fmt.Println("server listen err ", err)
		return
	}
	//启动协程,用于处理客户端的业务
	go func() {
		//2 从客户端读取数据,进行拆包
		conn, err := listener.Accept()
		if err != nil {
			fmt.Println("server accept err ", err)
			return
		}
		go func(conn net.Conn) {
			//处理客户端的请求
			//>-----拆包过程------<
			dp := NewDataPack()
			for {
				// ①第一次从conn中读,将包中的head读取出来[我们定义的headLen默认是8字节]
				headData := make([]byte, dp.GetHeadLen())
				_, err := io.ReadFull(conn, headData)
				if err != nil {
					fmt.Println("read head err ", err)
					return
				}
				//解析headData
				msgHead, err := dp.UnPack(headData)
				if err != nil {
					fmt.Println("server unpack err ", err)
					return
				}
				if msgHead.GetMsgLen() > 0 {
					//msg中是有数据的,需要进行第二次读取
					//②第二次读取,是根据head中的dataLen来读取data内容
					msg := msgHead.(*Message)
					//根据数据包中的数据长度创建对应的切片
					msg.Data = make([]byte, msg.GetMsgLen())
					_, err := io.ReadFull(conn, msg.Data)
					if err != nil {
						fmt.Println("server unpack err ", err)
						return
					}
					//完整的一个消息已经读取完毕
					fmt.Println("----->Receive MsgID:", msg.Id, "dataLen=", msg.DataLen, ",/data=", string(msg.Data))
				}
			}
		}(conn)
	}()

	/*
		模拟客户端发送数据包
	*/
	conn, err := net.Dial("tcp", "127.0.0.1:7777")
	if err != nil {
		fmt.Println("client dial err ", err)
		return
	}
	//创建一个封包对象
	dp := NewDataPack()
	//模拟粘包过程,封装两个msg一同发送
	msg1 := &Message{
		Id:      1,
		DataLen: 4,
		Data:    []byte{'z', 'i', 'n', 'x'},
	}
	msg2 := &Message{
		Id:      2,
		DataLen: 8,
		Data:    []byte{'h', 'e', 'l', 'l', 'o', ' ', 'y', 'a'},
	}
	//将两个数据包粘在一起[将数据进行打包],打包最后的结果还是一个[]byte切片
	sendData1, err := dp.Pack(msg1)
	if err != nil {
		fmt.Println("Client pack msg1 err ", err)
		return
	}
	sendData2, err := dp.Pack(msg2)
	if err != nil {
		fmt.Println("Client pack msg2 err ", err)
		return
	}
	//需要使用sendData2,将数据打散,否则会成为切片中嵌套切片
	sendData1 = append(sendData1, sendData2...)

	//一次性将全部数据发送给服务端
	conn.Write(sendData1)
	//阻塞,查看控制台打印结果是否正确
	select {}
}

在这里插入图片描述

2.3 zinx框架集成消息封装机制

将消息封装机制集成到我们自定义的zinx框架中

  • 将zinx/znet/connection.go中的StartReader方法使用封装后的消息实现
    在这里插入图片描述
  • 将zinx/znet/request.go中的data改为IMessage
  • 在zinx/znet/message.go中添加一个NewMessage的方法
  • 在zinx/znet/connection.go中新增SendMsg方法

①zinx/ziface/iconnection.go

package ziface

import "net"

type IConnection interface {
	//启动连接
	Start()
	//停止连接
	Stop()
	//获取当前连接的Conn对象
	GetTCPConnection() *net.TCPConn
	//获取当前连接模块的id
	GetConnectionID() uint32
	//获取远程客户端的TCP状态 IP:Port
	RemoteAddr() net.Addr
	//发送数据
	SendMsg(msgId uint32, data []byte) error
}

//定义一个处理连接业务的方法
type HandleFunc func(*net.TCPConn, []byte, int) error

②zinx/znet/connection.go

package znet

import (
	"fmt"
	"github.com/kataras/iris/v12/x/errors"
	"io"
	"myTest/zinx/ziface"
	"net"
)

type Connection struct {
	Conn     *net.TCPConn
	ConnID   uint32
	isClosed bool
	//告知当前的连接已经退出
	ExitChan chan bool
	Router   ziface.IRouter
}

func NewConnection(conn *net.TCPConn, connID uint32, router ziface.IRouter) *Connection {
	c := &Connection{
		Conn:     conn,
		ConnID:   connID,
		Router:   router,
		isClosed: false,
		ExitChan: make(chan bool, 1),
	}
	return c
}

func (c *Connection) StartReader() {
	fmt.Println("reader goroutine is running...")
	defer fmt.Println("connID=", c.ConnID, "Reader is exit, remote addr is ", c.RemoteAddr().String())
	defer c.Stop()
	//读取数据
	for {
		//buf := make([]byte, util.GlobalObject.MaxPackageSize)
		//_, err := c.Conn.Read(buf)
		//if err != nil {
		//	fmt.Printf("connID %d receive buf err %s\n", c.ConnID, err)
		//	continue
		//}

		//创建一个拆包对象
		dp := NewDataPack()
		//读取客户端的msg Head 二进制流 8字节
		headData := make([]byte, dp.GetHeadLen())
		if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
			fmt.Println("read msg head err ", err)
			break
		}
		//拆包,将读取到的headData封装为msg
		msg, err := dp.UnPack(headData)
		if err != nil {
			fmt.Println("unpack msg err ", err)
			break
		}
		//根据dataLen,再次读取Data,放在msg.Data中,
		var data []byte
		//如果数据包中有数据,则读取
		if msg.GetMsgLen() > 0 {
			data = make([]byte, msg.GetMsgLen())
			//将切片data读满
			if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
				fmt.Println("read msg data err ", err)
				break
			}
		}
		msg.SetData(data)

		//封装请求,改为router处理
		r := Request{
			conn: c.Conn,
			msg:  msg,
		}
		go func(request ziface.IRequest) {
			c.Router.PreHandle(request)
			c.Router.Handler(request)
			c.Router.PostHandler(request)
		}(&r)
	}
}

//启动连接
func (c *Connection) Start() {
	fmt.Printf("ConnID %d is Start...", c.ConnID)
	go c.StartReader()
}

//停止连接
func (c *Connection) Stop() {
	fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)
	if c.isClosed {
		return
	}
	c.isClosed = true
	c.Conn.Close()
	close(c.ExitChan)
}

//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {
	return c.Conn
}

//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {
	return c.ConnID
}

//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {
	return c.Conn.RemoteAddr()
}

//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
	if c.isClosed {
		return errors.New("connection closed\n")
	}
	//将data进行封包
	dp := NewDataPack()
	binaryMsg, err := dp.Pack(NewMessage(msgId, data))
	if err != nil {
		fmt.Println("Pack error msg id=", msgId)
		return errors.New("pack error msg")
	}
	//将数据发送给客户端
	if _, err := c.Conn.Write(binaryMsg); err != nil {
		fmt.Println("write msg id ", msgId, " error ", err)
		return errors.New("conn write err ")
	}
	return nil
}

2.4 zinx测试集成消息封装机制

注意:之前irequest.go和request.go代码有误,修改为以下即可

  • 修改部分主要为:将GetConnection更换为我们自定义的connection

/zinx/ziface/irequest.go:

package ziface

type IRequest interface {
	GetConnection() IConnection
	GetData() []byte
	GetMsgID() uint32
}

/zinx/znet/request.go:

package znet

import (
	"myTest/zinx/ziface"
)

type Request struct {
	conn ziface.IConnection
	msg  ziface.IMessage
}

func (r *Request) GetConnection() ziface.IConnection {
	return r.conn
}

func (r *Request) GetData() []byte {
	return r.msg.GetMsgData()
}

func (r *Request) GetMsgID() uint32 {
	return r.msg.GetMsdId()
}

①client.go

package main

import (
	"fmt"
	"io"
	"myTest/zinx/znet"
	"net"
	"time"
)

/*
模拟客户端
*/
func main() {
	fmt.Println("client start...")
	time.Sleep(time.Second * 1)
	//1 创建服务器连接
	conn, err := net.Dial("tcp", "127.0.0.1:8092")
	if err != nil {
		fmt.Println("client start err ", err)
		return
	}
	for {
		//发送封装后的数据包
		dp := znet.NewDataPack()
		binaryMsg, err := dp.Pack(znet.NewMessage(0, []byte("Zinx v0.5 client test msg")))
		if err != nil {
			fmt.Println("client pack msg err ", err)
			return
		}
		if _, err := conn.Write(binaryMsg); err != nil {
			fmt.Println("client write err ", err)
			return
		}
		//服务器应该给我们回复一个message数据,msgId为1,内容为ping...ping...

		//1 先读取流中的head部分,得到Id和dataLen
		binaryHead := make([]byte, dp.GetHeadLen())
		if _, err := io.ReadFull(conn, binaryHead); err != nil {
			fmt.Println("client read head err ", err)
			break
		}
		//将二进制的head拆包到msg中
		msgHead, err := dp.UnPack(binaryHead)
		if err != nil {
			fmt.Println("client unpack msgHead err ", err)
			break
		}
		if msgHead.GetMsgLen() > 0 {
			//2 有数据, 再根据dataLen进行二次读取,将data读出来
			msg := msgHead.(*znet.Message)
			msg.Data = make([]byte, msg.GetMsgLen())
			if _, err := io.ReadFull(conn, msg.Data); err != nil {
				fmt.Println("read msg data error ", err)
				return
			}
			fmt.Println("--------> Receive Server msg , ID=", msg.Id, " ,len=", msg.DataLen, " ,data=", string(msg.Data))
		}

		//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片
		time.Sleep(time.Second * 1)
	}
}

②server.go

package main

import (
	"fmt"
	"myTest/zinx/ziface"
	"myTest/zinx/znet"
)

//自定义一个Router,测试路由功能
type PingRouter struct {
	znet.BaseRouter
}

func (pr *PingRouter) Handler(request ziface.IRequest) {
	fmt.Println("call router handler...")
	//先读取客户端数据,再回写ping...ping...ping...
	fmt.Println("receive from client msgId=", request.GetMsgID(),
		"data=", string(request.GetData()))

	//回写ping
	err := request.GetConnection().SendMsg(1, []byte("ping...ping...ping..."))
	if err != nil {
		fmt.Println(err)
	}
}

func main() {
	s := znet.NewServer("[Zinx v5.0]")
	//添加自定义路由
	router := &PingRouter{}
	s.AddRouter(router)
	s.Serve()
}

测试结果:
在这里插入图片描述

2.5 消息管理模块(支持多路由)MsgHandler

①zinx/ziface/imsgHandler.go

package ziface

type IMsgHandler interface {
	DoMsgHandler(request IRequest)
	AddRouter(msgId uint32, router IRouter)
}

②zinx/znet/msgHandler.go

package znet

import (
	"fmt"
	"myTest/zinx/ziface"
	"strconv"
)

type MsgHandle struct {
	//msgId与对应的router对应
	Api map[uint32]ziface.IRouter
}

func NewMsgHandle() *MsgHandle {
	return &MsgHandle{
		Api: make(map[uint32]ziface.IRouter),
	}
}

func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {
	//判断是否有对应的router
	if _, ok := mh.Api[request.GetMsgID()]; !ok {
		fmt.Println("msgId ", request.GetMsgID(), "does not exist handler, need to add router")
		return
	}
	//call handler
	router := mh.Api[request.GetMsgID()]
	router.PreHandle(request)
	router.Handler(request)
	router.PostHandler(request)
}

func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {
	if _, ok := mh.Api[msgId]; ok {
		//如果已经存在了对应的router,则提示
		panic("repeat api, msgId = " + strconv.Itoa(int(msgId)))
	}
	mh.Api[msgId] = router
	fmt.Println("msgId ", msgId, "Add router success ")
}

2.6 消息管理模块集成到Zinx框架中[V0.6]

  1. 将server模块中的Router属性替换为MsgHandler
  2. 将server之前的AddRouter修改为调用MsgHandler的AddRouter
  3. 将connection模块中的Router属性修改为MsgHandler
  4. Connection中之前调度Router的业务替换为MsgHandler调度

①zinx/znet/server.go

package znet

import (
	"fmt"
	"myTest/zinx/util"
	"myTest/zinx/ziface"
	"net"
)

type Server struct {
	Name       string
	IPVersion  string
	IP         string
	Port       int
	MsgHandler *MsgHandle
}

func NewServer(name string) *Server {
	s := &Server{
		Name:       name,
		IPVersion:  "tcp4",
		IP:         util.GlobalObject.Host,
		Port:       util.GlobalObject.TcpPort,
		MsgHandler: NewMsgHandle(),
	}
	return s
}

func (s *Server) Start() {
	//启动服务监听端口
	fmt.Printf("[Zinx] Server Name :%s , listen IP :%v , Port: %d is starting \n", s.Name, s.IP, s.Port)
	fmt.Printf("[Zinx] Version :%s , MaxConn:%v , MaxPackageSize: %d \n", util.GlobalObject.Version, util.GlobalObject.MaxConn, util.GlobalObject.MaxPackageSize)

	go func() {
		addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
		if err != nil {
			fmt.Printf("resolve tcp addr error %v\n", err)
			return
		}
		listener, err := net.ListenTCP(s.IPVersion, addr)
		if err != nil {
			fmt.Println("listen ", s.IPVersion, " err ", err)
			return
		}
		fmt.Println("[start] Zinx server success ", s.Name, "Listening...")
		//阻塞连接,处理业务
		for {
			conn, err := listener.AcceptTCP()
			if err != nil {
				fmt.Println("Accept err ", err)
				continue
			}
			var cid uint32 = 0
			dealConn := NewConnection(conn, cid, s.MsgHandler)
			cid++
			//开启goroutine处理启动当前conn
			go dealConn.Start()
		}
	}()
}

func (s *Server) Stop() {

}

func (s *Server) Serve() {
	s.Start()
	//阻塞,一直读取客户端所发送过来的消息
	select {}
}

func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {
	s.MsgHandler.AddRouter(msgId, router)
}

②zinx/znet/connection.go

package znet

import (
	"fmt"
	"github.com/kataras/iris/v12/x/errors"
	"io"
	"net"
)

type Connection struct {
	Conn     *net.TCPConn
	ConnID   uint32
	isClosed bool
	//告知当前的连接已经退出
	ExitChan   chan bool
	MsgHandler *MsgHandle
}

func NewConnection(conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {
	c := &Connection{
		Conn:       conn,
		ConnID:     connID,
		MsgHandler: msgHandle,
		isClosed:   false,
		ExitChan:   make(chan bool, 1),
	}
	return c
}

func (c *Connection) StartReader() {
	fmt.Println("reader goroutine is running...")
	defer fmt.Println("connID=", c.ConnID, "Reader is exit, remote addr is ", c.RemoteAddr().String())
	defer c.Stop()
	//读取数据
	for {
		//创建一个拆包对象
		dp := NewDataPack()
		//读取客户端的msg Head 二进制流 8字节
		headData := make([]byte, dp.GetHeadLen())
		if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
			fmt.Println("read msg head err ", err)
			break
		}
		//拆包,将读取到的headData封装为msg
		msg, err := dp.UnPack(headData)
		if err != nil {
			fmt.Println("unpack msg err ", err)
			break
		}
		//根据dataLen,再次读取Data,放在msg.Data中,
		var data []byte
		//如果数据包中有数据,则读取
		if msg.GetMsgLen() > 0 {
			data = make([]byte, msg.GetMsgLen())
			//将切片data读满
			if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
				fmt.Println("read msg data err ", err)
				break
			}
		}
		msg.SetData(data)

		//封装请求,改为router处理
		r := Request{
			conn: c,
			msg:  msg,
		}
		go c.MsgHandler.DoMsgHandler(&r)
	}
}

//启动连接
func (c *Connection) Start() {
	fmt.Printf("ConnID %d is Start...", c.ConnID)
	go c.StartReader()
}

//停止连接
func (c *Connection) Stop() {
	fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)
	if c.isClosed {
		return
	}
	c.isClosed = true
	c.Conn.Close()
	close(c.ExitChan)
}

//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {
	return c.Conn
}

//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {
	return c.ConnID
}

//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {
	return c.Conn.RemoteAddr()
}

//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
	if c.isClosed {
		return errors.New("connection closed\n")
	}
	//将data进行封包
	dp := NewDataPack()
	binaryMsg, err := dp.Pack(NewMessage(msgId, data))
	if err != nil {
		fmt.Println("Pack error msg id=", msgId)
		return errors.New("pack error msg")
	}
	//将数据发送给客户端
	if _, err := c.Conn.Write(binaryMsg); err != nil {
		fmt.Println("write msg id ", msgId, " error ", err)
		return errors.New("conn write err ")
	}
	return nil
}

③测试

myDemo/ZinxV0.6/Client0.go

第一个客户端

package main

import (
	"fmt"
	"io"
	"myTest/zinx/znet"
	"net"
	"time"
)

/*
模拟客户端
*/
func main() {
	fmt.Println("client start...")
	time.Sleep(time.Second * 1)
	//1 创建服务器连接
	conn, err := net.Dial("tcp", "127.0.0.1:8092")
	if err != nil {
		fmt.Println("client start err ", err)
		return
	}
	for {
		//发送封装后的数据包
		dp := znet.NewDataPack()
		binaryMsg, err := dp.Pack(znet.NewMessage(0, []byte("Zinx client0 test msg")))
		if err != nil {
			fmt.Println("client pack msg err ", err)
			return
		}
		if _, err := conn.Write(binaryMsg); err != nil {
			fmt.Println("client write err ", err)
			return
		}
		//服务器应该给我们回复一个message数据,msgId为1,内容为ping...ping...

		//1 先读取流中的head部分,得到Id和dataLen
		binaryHead := make([]byte, dp.GetHeadLen())
		if _, err := io.ReadFull(conn, binaryHead); err != nil {
			fmt.Println("client read head err ", err)
			break
		}
		//将二进制的head拆包到msg中
		msgHead, err := dp.UnPack(binaryHead)
		if err != nil {
			fmt.Println("client unpack msgHead err ", err)
			break
		}
		if msgHead.GetMsgLen() > 0 {
			//2 有数据, 再根据dataLen进行二次读取,将data读出来
			msg := msgHead.(*znet.Message)
			msg.Data = make([]byte, msg.GetMsgLen())
			if _, err := io.ReadFull(conn, msg.Data); err != nil {
				fmt.Println("read msg data error ", err)
				return
			}
			fmt.Println("--------> Receive Server msg , ID=", msg.Id, " ,len=", msg.DataLen, " ,data=", string(msg.Data))
		}

		//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片
		time.Sleep(time.Second * 1)
	}
}
myDemo/ZinxV0.6/Client1.go
package main

import (
	"fmt"
	"io"
	"myTest/zinx/znet"
	"net"
	"time"
)

/*
模拟客户端
*/
func main() {
	fmt.Println("client start...")
	time.Sleep(time.Second * 1)
	//1 创建服务器连接
	conn, err := net.Dial("tcp", "127.0.0.1:8092")
	if err != nil {
		fmt.Println("client start err ", err)
		return
	}
	for {
		//发送封装后的数据包
		dp := znet.NewDataPack()
		binaryMsg, err := dp.Pack(znet.NewMessage(1, []byte("Zinx client1 test msg")))
		if err != nil {
			fmt.Println("client pack msg err ", err)
			return
		}
		if _, err := conn.Write(binaryMsg); err != nil {
			fmt.Println("client write err ", err)
			return
		}
		//服务器应该给我们回复一个message数据,msgId为1,内容为ping...ping...

		//1 先读取流中的head部分,得到Id和dataLen
		binaryHead := make([]byte, dp.GetHeadLen())
		if _, err := io.ReadFull(conn, binaryHead); err != nil {
			fmt.Println("client read head err ", err)
			break
		}
		//将二进制的head拆包到msg中
		msgHead, err := dp.UnPack(binaryHead)
		if err != nil {
			fmt.Println("client unpack msgHead err ", err)
			break
		}
		if msgHead.GetMsgLen() > 0 {
			//2 有数据, 再根据dataLen进行二次读取,将data读出来
			msg := msgHead.(*znet.Message)
			msg.Data = make([]byte, msg.GetMsgLen())
			if _, err := io.ReadFull(conn, msg.Data); err != nil {
				fmt.Println("read msg data error ", err)
				return
			}
			fmt.Println("--------> Receive Server msg , ID=", msg.Id, " ,len=", msg.DataLen, " ,data=", string(msg.Data))
		}

		//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片
		time.Sleep(time.Second * 1)
	}
}
myDemo/ZinxV0.6/Server.go
package main

import (
	"fmt"
	"myTest/zinx/ziface"
	"myTest/zinx/znet"
)

//自定义一个Router,测试路由功能
type PingRouter struct {
	znet.BaseRouter
}

func (pr *PingRouter) Handler(request ziface.IRequest) {
	fmt.Println("call router handler...")
	//先读取客户端数据,再回写ping...ping...ping...
	fmt.Println("receive from client msgId=", request.GetMsgID(),
		"data=", string(request.GetData()))

	//回写ping
	err := request.GetConnection().SendMsg(0, []byte("ping...ping...ping..."))
	if err != nil {
		fmt.Println(err)
	}
}

//定义第二个Router
type HelloRouter struct {
	znet.BaseRouter
}

func (hr *HelloRouter) Handler(request ziface.IRequest) {
	fmt.Println("receive from client msgId=", request.GetMsgID(),
		"data=", string(request.GetData()))
	err := request.GetConnection().SendMsg(1, []byte("hello zinx, I'm the other handler"))
	if err != nil {
		fmt.Println(err)
	}
}

func main() {
	s := znet.NewServer("[Zinx v0.6]")
	//添加自定义路由(PingRouter和HelloRouter)
	router0 := &PingRouter{}
	s.AddRouter(0, router0)
	router1 := &HelloRouter{}
	s.AddRouter(1, router1)
	s.Serve()
}

测试结果:
在这里插入图片描述

Zinx正确接受了不同客户端的请求,并根据不同的请求做出了不同的处理

  • 根据msgId和注册handler来对应处理不同请求

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/816639.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL数据库 【索引事务】

目录 一、概念 二、索引的优缺点 1、索引的优点 2、索引的缺陷 三、索引的使用 1、查看索引 2、创建索引 3、删除索引 四、索引底层的数据结构 1、B树 2、B树 五、索引事务 1、概念和回滚 2、事务的使用 3、事务的基本特性 4、并发会遇到的问题 &#xff08…

Python程序设计基础:字典与集合(二)

文章目录 一、字典的整体操作1、字典的遍历2、字典的排序3、字典的合并 二、创建与访问集合1、集合的创建2、集合的访问 三、集合的基本操作1、集合的增、删、查2、集合的数学运算 一、字典的整体操作 字典的整体操作是指以字典为操作对象&#xff0c;对字典进行遍历、排序以及…

hdu foreverlasting and fried-chicken

题意&#xff1a; 在一个有n个点和m条边的图中找到形状是上图的子图&#xff0c;输出个数 思路&#xff1a; 仔细观察上图&#xff0c;设第二行的那个点为x&#xff0c;最后一行的点为y&#xff0c;那么可以知道&#xff0c;如果x和y都和相同的所有点中取四个点分别和xy相连…

DUBBO服务多网卡,服务调用失败

如果服务器是多网卡的&#xff0c;比如安装了docker&#xff0c;有一个docker虚拟网卡&#xff0c;一个实体网卡eth0&#xff0c;当我们运行springboot应用后&#xff0c;dubbo注入到zk的地址是 docker虚拟网卡的地址172网段&#xff0c;而不是实际内网地址192网段&#xff0c;…

OpenLayers实战,OpenLayers使用wind-layer插件实现风场动态效果

专栏目录: OpenLayers入门教程汇总目录 前言 本章讲解如何使用OpenLayers的气象风场插件wind-layer实现气象风场动态效果,该插件除了可用于OpenLayers之外,还可用于mapgl、leaflet和cesuim等二维/三维地图引擎,还是很强大的,废话少谈,让我们立刻开始实现吧。 二、依赖和…

如何将论文中的字快速复制出来?图片如何提取文字?

在日常的办公中&#xff0c;我们经常会遇到需要将纸质文件里的文字提取出来&#xff0c;再转换为电子档的情况&#xff0c;如果我们采用手动输入的话&#xff0c;不仅速度太慢&#xff0c;而且还可能因此耽误到后边的工作&#xff0c;是不是已经有小伙伴遇到这种现象&#xff0…

element中tabs组件,click事件点击拿到当前item的所有数据

话不多说&#xff0c;直接上代码&#xff1a; 添加一个:value&#xff0c;然后在用JSON.stringify(item)转一下就可以了&#xff0c;这样就会存在$attrs.value这个里面了。 接着在点击事件里面获取使用el.$attrs.value&#xff0c;注意这里在拿到这个值时&#xff0c;再用JSON…

锌离子荧光探针TSQ,109628-27-5,具有很好的选择性荧光探针

资料编辑|陕西新研博美生物科技有限公司小编MISSwu​ PART1----外观以及性质&#xff1a; 锌离子荧光探针TSQ&#xff08;CAS号&#xff1a;109628-27-5&#xff09;&#xff0c;锌离子荧光探针TSQ是用于检测锌离子的荧光探针。 TSQ与Zn离子结合后&#xff0c;吸收波长和发射波…

如何创建一个SpringBoot项目

欢迎来到南方有乔木的博客&#xff01;&#xff01;&#xff01; 博主主页&#xff1a;点击点击&#xff01;戳一戳&#xff01;&#xff01; 博主名:南方有乔木 博主简介&#xff1a; 一名在校大学生&#xff0c;正在努力学习Java语言编程。穷且意坚&#xff0c;不坠青云之…

sort排序报错:java.lang.UnsupportedOperationException: null

文章目录 问题原因解决方式 问题 Groovy 调用 .sort{} 排序报错:java.lang.UnsupportedOperationException: null solutionScenario2SolutionProcessList.sort { it.idx } 原因 调用的sort的对象是Collections的内部类对象UnmodifiableRandomAcessList 解决方式 调用 coll…

数据结构:单链表的实现(C语言)

个人主页 &#xff1a; 水月梦镜花 个人专栏 &#xff1a; 《C语言》 《数据结构》 文章目录 前言一、单链表实现思路和图解1.节点的定义(SListNode)2.申请一个节点(BuySListNode)3.单链表打印(SListPrint)4.单链表尾插(SListPushBack)5.单链表的头插(SListPushFront)6.单链表的…

【ChatGPT辅助学Rust | 基础系列 | Hello, Rust】编写并运行第一个Rust程序

文章目录 前言一&#xff0c;创建项目二&#xff0c;两种编译方式1. 使用rustc编译器编译2. 使用Cargo编译 总结 前言 在开始学习任何一门新的编程语言时&#xff0c;都会从编写一个简单的 “Hello, World!” 程序开始。在这一章节中&#xff0c;将会介绍如何在Rust中编写并运…

音频编辑必备技能:怎么将音频转换mp3

丽萨&#xff1a;嘿&#xff0c;听说你最近在研究音频格式转换的方法&#xff0c;有眉目了吗&#xff1f; 凯瑞&#xff1a;没错&#xff0c;我下载了很多高清音乐&#xff0c;发现有些格式的音频文件在我的播放器上打不开&#xff0c;所以想一个转换工具。但是网上软件太多&a…

使用腾讯云 Cloud studio 实现调度百度AI实现文字识别

文章目录 前言导入模块设置百度AI的APP_ID、API_KEY和SECRET_KEY定义路径和文件列表打开文本文件准备写入数据逐个处理图片文件关闭文本文件重复处理其他图片文件完整代码解释说明 运行效果 前言 今天我们也来高大上一下&#xff0c;玩一把人工智能。那就是免费调用百度AI实现图…

站外引流效果差?一文带你搞懂解海外主流社交媒体算法!

在流量成本越来越高的当下&#xff0c;无论是平台卖家还是独立站卖家都在努力拓展流量渠道。站外引流是推动业务增长的关键策略&#xff0c;很多卖家会把重点放在内容营销上&#xff0c;但其实除了做好内容之前&#xff0c;了解社交媒体的算法才能让营销效果最大化。 01.Faceb…

操作系统专栏2进程管理from 小林coding

进程管理 基本概念进程控制进程上下文切换 线程进程和线程的比较进程通信管道消息队列共享内存信号量信号socket 基本概念 进程:一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元.并行和并发:状态: 其中挂起是指没有给程序分配实际…

一百三十八、ClickHouse——使用clickhouse-backup备份ClickHouse库表

一、目标 使用clickhouse-backup在本地全库备份ClickHouse的数据库 二、前提 已经安装好clickhouse-backup 注意&#xff1a;由于之前同事已经按照好clickhouse-backup&#xff0c;所以我就没有安装 如有需要请参考其他人的博客安装一下&#xff0c;下面是我认为比较好的一…

如何看待前端已死这个问题(大学生篇)

小编刚大学毕业&#xff0c;还记得是大三的时候选择的前端开发方向&#xff0c;那个时候行情其实并没有这么差&#xff0c;最近互联网上讨论这一个很火的话题&#xff0c;叫前端已死。那么我就说说我的看法吧&#xff0c;虽然可能比起行业的大佬会比较短浅&#xff0c;但我想就…

盘点12个Vue 3的高颜值UI组件库

今天给大家盘点12个Vue 3的高颜值UI组件库&#xff0c;凡是用过Vue 框架开发项目的老铁们最少有用过其中一种或者二种以上的UI组件库&#xff0c;用广东话讲&#xff1a;个个都靓。 今天给大家盘点12个Vue 3的高颜值UI组件库&#xff0c;凡是用过Vue 框架开发项目的老铁们最少有…

【我们一起60天准备考研算法面试(大全)-第三十天 30/60】【矩阵翻转】【矩阵相乘】

专注 效率 记忆 预习 笔记 复习 做题 欢迎观看我的博客&#xff0c;如有问题交流&#xff0c;欢迎评论区留言&#xff0c;一定尽快回复&#xff01;&#xff08;大家可以去看我的专栏&#xff0c;是所有文章的目录&#xff09;   文章字体风格&#xff1a; 红色文字表示&#…