从0到1开发go-tcp框架【3-读写协程分离、引入消息队列、进入连接管理器、引入连接属性】
1 读写协程分离[v0.7]
- 添加一个Reader和Writer之间通信的channel
- 添加一个Writer goroutine
- Reader由之前直接发送给客户端改为发送给通信channel
- 启动Reader和Writer一起工作
zinx/znet/connection.go
package znet
import (
"fmt"
"github.com/kataras/iris/v12/x/errors"
"io"
"net"
)
type Connection struct {
Conn *net.TCPConn
ConnID uint32
isClosed bool
msgChannel chan []byte
//告知当前的连接已经退出/停止(由Reader告知writer退出)
ExitChan chan bool
MsgHandler *MsgHandle
}
func NewConnection(conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {
c := &Connection{
Conn: conn,
ConnID: connID,
MsgHandler: msgHandle,
isClosed: false,
msgChannel: make(chan []byte),
ExitChan: make(chan bool, 1),
}
return c
}
func (c *Connection) StartWriter() {
fmt.Println("[Writer Goroutine is running]")
defer fmt.Println("[conn Writer goroutine exit!]", c.RemoteAddr().String())
//不断的阻塞等待channel的消息,然后将channel中的消息写给客户端
for {
select {
case data := <-c.msgChannel:
//有数据写给客户端
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send data error , ", err)
return
}
case <-c.ExitChan:
//代表reader已经退出,此时writer也需要退出
return
}
}
}
func (c *Connection) StartReader() {
fmt.Println("reader goroutine is running...")
defer fmt.Println("[Reader goroutine is exit] connID=", c.ConnID, " remote addr is ", c.RemoteAddr().String())
defer c.Stop()
//读取数据
for {
//创建一个拆包对象
dp := NewDataPack()
//读取客户端的msg Head 二进制流 8字节
headData := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
fmt.Println("read msg head err ", err)
break
}
//拆包,将读取到的headData封装为msg
msg, err := dp.UnPack(headData)
if err != nil {
fmt.Println("unpack msg err ", err)
break
}
//根据dataLen,再次读取Data,放在msg.Data中,
var data []byte
//如果数据包中有数据,则读取
if msg.GetMsgLen() > 0 {
data = make([]byte, msg.GetMsgLen())
//将切片data读满
if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
fmt.Println("read msg data err ", err)
break
}
}
msg.SetData(data)
//封装请求,改为router处理
r := Request{
conn: c,
msg: msg,
}
go c.MsgHandler.DoMsgHandler(&r)
}
}
//启动连接
func (c *Connection) Start() {
fmt.Printf("ConnID %d is Start...", c.ConnID)
//开启读、写
go c.StartReader()
go c.StartWriter()
}
//停止连接
func (c *Connection) Stop() {
fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)
if c.isClosed {
return
}
c.isClosed = true
c.Conn.Close()
c.ExitChan <- true
close(c.msgChannel)
close(c.ExitChan)
}
//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {
return c.Conn
}
//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {
return c.ConnID
}
//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.isClosed {
return errors.New("connection closed\n")
}
//将data进行封包
dp := NewDataPack()
binaryMsg, err := dp.Pack(NewMessage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id=", msgId)
return errors.New("pack error msg")
}
//将数据发送给客户端
if _, err := c.Conn.Write(binaryMsg); err != nil {
fmt.Println("write msg id ", msgId, " error ", err)
return errors.New("conn write err ")
}
return nil
}
测试
myDemo/ZinxV0.7/client.go
- client0.go
package main
import (
"fmt"
"io"
"myTest/zinx/znet"
"net"
"time"
)
/*
模拟客户端
*/
func main() {
fmt.Println("client start...")
time.Sleep(time.Second * 1)
//1 创建服务器连接
conn, err := net.Dial("tcp", "127.0.0.1:8092")
if err != nil {
fmt.Println("client start err ", err)
return
}
for {
//发送封装后的数据包
dp := znet.NewDataPack()
binaryMsg, err := dp.Pack(znet.NewMessage(0, []byte("Zinx client0 test msg")))
if err != nil {
fmt.Println("client pack msg err ", err)
return
}
if _, err := conn.Write(binaryMsg); err != nil {
fmt.Println("client write err ", err)
return
}
//服务器应该给我们回复一个message数据,msgId为1,内容为ping...ping...
//1 先读取流中的head部分,得到Id和dataLen
binaryHead := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(conn, binaryHead); err != nil {
fmt.Println("client read head err ", err)
break
}
//将二进制的head拆包到msg中
msgHead, err := dp.UnPack(binaryHead)
if err != nil {
fmt.Println("client unpack msgHead err ", err)
break
}
if msgHead.GetMsgLen() > 0 {
//2 有数据, 再根据dataLen进行二次读取,将data读出来
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetMsgLen())
if _, err := io.ReadFull(conn, msg.Data); err != nil {
fmt.Println("read msg data error ", err)
return
}
fmt.Println("--------> Receive Server msg , ID=", msg.Id, " ,len=", msg.DataLen, " ,data=", string(msg.Data))
}
//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片
time.Sleep(time.Second * 1)
}
}
- client1.go
myDemo/ZinxV0.7/server.go
package main
import (
"fmt"
"myTest/zinx/ziface"
"myTest/zinx/znet"
)
//自定义一个Router,测试路由功能
type PingRouter struct {
znet.BaseRouter
}
func (pr *PingRouter) Handler(request ziface.IRequest) {
fmt.Println("call router handler...")
//先读取客户端数据,再回写ping...ping...ping...
fmt.Println("receive from client msgId=", request.GetMsgID(),
"data=", string(request.GetData()))
//回写ping
err := request.GetConnection().SendMsg(0, []byte("ping...ping...ping..."))
if err != nil {
fmt.Println(err)
}
}
//定义第二个Router
type HelloRouter struct {
znet.BaseRouter
}
func (hr *HelloRouter) Handler(request ziface.IRequest) {
fmt.Println("receive from client msgId=", request.GetMsgID(),
"data=", string(request.GetData()))
err := request.GetConnection().SendMsg(1, []byte("hello zinx, I'm the other handler"))
if err != nil {
fmt.Println(err)
}
}
func main() {
s := znet.NewServer("[Zinx v0.7]")
//添加自定义路由(PingRouter和HelloRouter)
router0 := &PingRouter{}
s.AddRouter(0, router0)
router1 := &HelloRouter{}
s.AddRouter(1, router1)
s.Serve()
}
结果:
- 接受多个客户端也可以
- 当client0退出时,不会影响client1
2 创建消息队列及多任务[v0.8]
- 创建一个消息队列,MsgHandler消息管理模块增加:TaskQueue、WorkerPoolSize
- 创还能多任务worker的工作池并且启动
- 将之前发送的消息,全部改为把消息发送给消息队列和worker工作池来处理
实现消息队列机制和工作池机制(集成到自定义框架)
- 创建一个消息队列:MsgHandler消息管理模块
- 创建多任务worker的工作池并启动
- 将之前发送的消息,全部改为把消息发送给消息队列和worker工作池来处理
- 将消息队列机制集成到Zinx框架中
- 开启并调用消息队列及worker工作池
- 将从客户端处理的消息,发送给当前Worker的工作池来处理
zinx/znet/server.go
package znet
import (
"fmt"
"myTest/zinx/util"
"myTest/zinx/ziface"
"net"
)
type Server struct {
Name string
IPVersion string
IP string
Port int
MsgHandler *MsgHandle
}
func NewServer(name string) *Server {
s := &Server{
Name: name,
IPVersion: "tcp4",
IP: util.GlobalObject.Host,
Port: util.GlobalObject.TcpPort,
MsgHandler: NewMsgHandle(),
}
return s
}
func (s *Server) Start() {
//启动服务监听端口
fmt.Printf("[Zinx] Server Name :%s , listen IP :%v , Port: %d is starting \n", s.Name, s.IP, s.Port)
fmt.Printf("[Zinx] Version :%s , MaxConn:%v , MaxPackageSize: %d \n", util.GlobalObject.Version, util.GlobalObject.MaxConn, util.GlobalObject.MaxPackageSize)
var cid uint32 = 0
go func() {
//0 开启消息队列及Worker工作池
s.MsgHandler.StartWorkerPool()
addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
if err != nil {
fmt.Printf("resolve tcp addr error %v\n", err)
return
}
listener, err := net.ListenTCP(s.IPVersion, addr)
if err != nil {
fmt.Println("listen ", s.IPVersion, " err ", err)
return
}
fmt.Println("[start] Zinx server success ", s.Name, "Listening...")
//阻塞连接,处理业务
for {
conn, err := listener.AcceptTCP()
if err != nil {
fmt.Println("Accept err ", err)
continue
}
dealConn := NewConnection(conn, cid, s.MsgHandler)
cid++
//开启goroutine处理启动当前conn
go dealConn.Start()
}
}()
}
func (s *Server) Stop() {
}
func (s *Server) Serve() {
s.Start()
//阻塞,一直读取客户端所发送过来的消息
select {}
}
func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {
s.MsgHandler.AddRouter(msgId, router)
}
zinx/znet/connection.go
package znet
import (
"fmt"
"github.com/kataras/iris/v12/x/errors"
"io"
"myTest/zinx/util"
"net"
)
type Connection struct {
Conn *net.TCPConn
ConnID uint32
isClosed bool
msgChannel chan []byte
//告知当前的连接已经退出/停止(由Reader告知writer退出)
ExitChan chan bool
MsgHandler *MsgHandle
}
func NewConnection(conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {
c := &Connection{
Conn: conn,
ConnID: connID,
MsgHandler: msgHandle,
isClosed: false,
msgChannel: make(chan []byte),
ExitChan: make(chan bool, 1),
}
return c
}
func (c *Connection) StartWriter() {
fmt.Println("[Writer Goroutine is running]")
defer fmt.Println("[conn Writer goroutine exit!]", c.RemoteAddr().String())
//不断的阻塞等待channel的消息,然后将channel中的消息写给客户端
for {
select {
case data := <-c.msgChannel:
//有数据写给客户端
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send data error , ", err)
return
}
case <-c.ExitChan:
//代表reader已经退出,此时writer也需要退出
return
}
}
}
func (c *Connection) StartReader() {
fmt.Println("reader goroutine is running...")
defer fmt.Println("[Reader goroutine is exit] connID=", c.ConnID, " remote addr is ", c.RemoteAddr().String())
defer c.Stop()
//读取数据
for {
//创建一个拆包对象
dp := NewDataPack()
//读取客户端的msg Head 二进制流 8字节
headData := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
fmt.Println("read msg head err ", err)
break
}
//拆包,将读取到的headData封装为msg
msg, err := dp.UnPack(headData)
if err != nil {
fmt.Println("unpack msg err ", err)
break
}
//根据dataLen,再次读取Data,放在msg.Data中,
var data []byte
//如果数据包中有数据,则读取
if msg.GetMsgLen() > 0 {
data = make([]byte, msg.GetMsgLen())
//将切片data读满
if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
fmt.Println("read msg data err ", err)
break
}
}
msg.SetData(data)
//封装请求,改为router处理
r := Request{
conn: c,
msg: msg,
}
//判断是否开启workerPool,如果没有开启则直接创建协程处理;如果开启则通过workerPool处理
if util.GlobalObject.WorkerPoolSize > 0 {
c.MsgHandler.SendMsgToTaskQueue(&r)
} else {
go c.MsgHandler.DoMsgHandler(&r)
}
}
}
//启动连接
func (c *Connection) Start() {
fmt.Printf("ConnID %d is Start...", c.ConnID)
//开启读、写
go c.StartReader()
go c.StartWriter()
}
//停止连接
func (c *Connection) Stop() {
fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)
if c.isClosed {
return
}
c.isClosed = true
c.Conn.Close()
c.ExitChan <- true
close(c.msgChannel)
close(c.ExitChan)
}
//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {
return c.Conn
}
//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {
return c.ConnID
}
//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.isClosed {
return errors.New("connection closed\n")
}
//将data进行封包
dp := NewDataPack()
binaryMsg, err := dp.Pack(NewMessage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id=", msgId)
return errors.New("pack error msg")
}
//将数据发送给客户端
if _, err := c.Conn.Write(binaryMsg); err != nil {
fmt.Println("write msg id ", msgId, " error ", err)
return errors.New("conn write err ")
}
return nil
}
zinx/znet/msgHandler.go
package znet
import (
"fmt"
"myTest/zinx/util"
"myTest/zinx/ziface"
"strconv"
)
type MsgHandle struct {
//msgId与对应的router对应
Api map[uint32]ziface.IRouter
//负责worker取任务的消息队列
TaskQueue []chan ziface.IRequest
//业务工作worker池的goroutine数量
WorkerPoolSize uint32
}
func NewMsgHandle() *MsgHandle {
return &MsgHandle{
Api: make(map[uint32]ziface.IRouter),
TaskQueue: make([]chan ziface.IRequest, util.GlobalObject.WorkerPoolSize),
WorkerPoolSize: util.GlobalObject.WorkerPoolSize,
}
}
func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {
//判断是否有对应的router
if _, ok := mh.Api[request.GetMsgID()]; !ok {
fmt.Println("msgId ", request.GetMsgID(), "does not exist handler, need to add router")
return
}
//call handler
router := mh.Api[request.GetMsgID()]
router.PreHandle(request)
router.Handler(request)
router.PostHandler(request)
}
func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {
if _, ok := mh.Api[msgId]; ok {
//如果已经存在了对应的router,则提示
panic("repeat api, msgId = " + strconv.Itoa(int(msgId)))
}
mh.Api[msgId] = router
fmt.Println("msgId ", msgId, "Add router success ")
}
//启动一个worker工作池(开启工作池的动作只能发生一次,一个zinx框架只能有一个worker工作池)
func (mh *MsgHandle) StartWorkerPool() {
for i := 0; i < int(mh.WorkerPoolSize); i++ {
//开辟任务队列
mh.TaskQueue[i] = make(chan ziface.IRequest, util.GlobalObject.MaxWorkerTaskLen)
//启动worker
go mh.startOneWorker(i, mh.TaskQueue[i])
}
}
func (mh *MsgHandle) startOneWorker(workerId int, taskQueue chan ziface.IRequest) {
fmt.Println("Worker ID=", workerId, " is started...")
for {
select {
//从任务队列中取消息(如果有消息过来,出列的就是request,然后执行该request所绑定的业务)
case request := <-taskQueue:
mh.DoMsgHandler(request)
}
}
}
//将消息交给taskQueue,由Worker进行处理
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {
//通过取余数的方式来达到负载均衡
workID := request.GetConnection().GetConnectionID() % util.GlobalObject.WorkerPoolSize
fmt.Println("Add ConnID=", request.GetConnection().GetConnectionID(),
" requestID=", request.GetMsgID(),
" workID=", workID)
//将消息发送给对应worker的任务队列
mh.TaskQueue[workID] <- request
}
zinx/ziface/imsgHandler.go
package ziface
type IMsgHandler interface {
DoMsgHandler(request IRequest)
AddRouter(msgId uint32, router IRouter)
StartWorkerPool()
SendMsgToTaskQueue(request IRequest)
}
测试
myDemo/ZinxV0.8/Server.go
同myDemo/ZinxV0.7/Server.go,修改一下NewServer时候所传的Zinx的名称即可
myDemo/ZinxV0.8/Client.go
同myDemo/ZinxV0.7/Client.go
myDemo/ZinxV0.8/zinx.json
{
"Name": "Zinx Server Application",
"Version": "V0.8",
"Host": "0.0.0.0",
"TcpPort": 8092,
"MaxConn": 30,
"MaxPackageSize": 1024,
"WorkerPoolSize": 10
}
3 连接管理器(connManager)[v0.9]
3.1 连接管理器(conn)的定义与实现
创建一个连接管理模块ConnManager
- 添加连接
- 删除连接
- 根据连接ID查找对应的连接
- 总连接个数
- 清理全部的连接
3.2 将连接管理模块集成到Zinx框架中
- 给server添加一个ConnMgr属性
- 修改NewServer方法,加入ConnMgr初始化
- 判断当前连接数是否超出最大值MaxConn
- 当server停止的时候(调用server.Stop方法),应该加入ConnMgr.ClearConn()
3.3 提供创建连接/销毁连之前所需的Hook函数
给我们自定义框架Zinx提供创建连接之后/销毁连接之前所要处理的一些业务。提供给用户能够注册的Hook函数
- 添加OnConnStart()
- 添加OnConnStop()
zinx/ziface/iserver.go
package ziface
type IServer interface {
Start()
Stop()
Serve()
AddRouter(msgId uint32, router IRouter)
GetConnMgr() IConnManager
//注册创OnConnStart钩子函数
SetOnConnStart(func(conn IConnection))
SetOnConnStop(func(conn IConnection))
//调用OnConnStart钩子函数
CallOnConnStart(conn IConnection)
CallOnConnStop(conn IConnection)
}
zinx/ziface/iconnmanager.go
package ziface
type IConnManager interface {
Add(conn IConnection)
Remove(conn IConnection)
Get(connID uint32) (IConnection, error)
Len() int
ClearConn()
}
zinx/znet/connmanager.go
package znet
import (
"fmt"
"github.com/kataras/iris/v12/x/errors"
"myTest/zinx/util"
"myTest/zinx/ziface"
"sync"
)
type ConnManager struct {
connections map[uint32]ziface.IConnection //管理的连接集合
connLock sync.RWMutex //保护连接集合的读写锁
}
func NewConnManager() *ConnManager {
return &ConnManager{
connections: make(map[uint32]ziface.IConnection, util.GlobalObject.MaxConn),
}
}
func (cm *ConnManager) Add(conn ziface.IConnection) {
//添加写锁
cm.connLock.Lock()
defer cm.connLock.Unlock()
cm.connections[conn.GetConnectionID()] = conn
fmt.Println("connectionID=", conn.GetConnectionID(), " add to ConnManager success, conn num=", cm.Len())
}
func (cm *ConnManager) Remove(conn ziface.IConnection) {
//保护共享资源map
cm.connLock.Lock()
defer cm.connLock.Unlock()
delete(cm.connections, conn.GetConnectionID())
fmt.Println("connectionID=", conn.GetConnectionID(), " remote from ConnManager success, conn num=", cm.Len())
}
func (cm *ConnManager) Get(connID uint32) (ziface.IConnection, error) {
cm.connLock.RLock()
defer cm.connLock.RUnlock()
if conn, ok := cm.connections[connID]; ok {
return conn, nil
} else {
return nil, errors.New("connection NOT FOUND")
}
}
func (cm *ConnManager) Len() int {
return len(cm.connections)
}
func (cm *ConnManager) ClearConn() {
cm.connLock.Lock()
defer cm.connLock.Unlock()
for connID, conn := range cm.connections {
//停止连接
conn.Stop()
//删除连接
delete(cm.connections, connID)
}
fmt.Println("Clear All connections success! conn num=", cm.Len())
}
zinx/znet/connection.go
package znet
import (
"fmt"
"github.com/kataras/iris/v12/x/errors"
"io"
"myTest/zinx/util"
"myTest/zinx/ziface"
"net"
)
type Connection struct {
Conn *net.TCPConn
ConnID uint32
isClosed bool
msgChannel chan []byte
//告知当前的连接已经退出/停止(由Reader告知writer退出)
ExitChan chan bool
MsgHandler *MsgHandle
TcpServer ziface.IServer
}
func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {
c := &Connection{
Conn: conn,
ConnID: connID,
MsgHandler: msgHandle,
isClosed: false,
msgChannel: make(chan []byte),
ExitChan: make(chan bool, 1),
TcpServer: server,
}
//将conn添加到connMgr中
c.TcpServer.GetConnMgr().Add(c)
return c
}
func (c *Connection) StartWriter() {
fmt.Println("[Writer Goroutine is running]")
defer fmt.Println("[conn Writer goroutine exit!]", c.RemoteAddr().String())
//不断的阻塞等待channel的消息,然后将channel中的消息写给客户端
for {
select {
case data := <-c.msgChannel:
//有数据写给客户端
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send data error , ", err)
return
}
case <-c.ExitChan:
//代表reader已经退出,此时writer也需要退出
return
}
}
}
func (c *Connection) StartReader() {
fmt.Println("reader goroutine is running...")
defer fmt.Println("[Reader goroutine is exit] connID=", c.ConnID, " remote addr is ", c.RemoteAddr().String())
defer c.Stop()
//读取数据
for {
//创建一个拆包对象
dp := NewDataPack()
//读取客户端的msg Head 二进制流 8字节
headData := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
fmt.Println("read msg head err ", err)
break
}
//拆包,将读取到的headData封装为msg
msg, err := dp.UnPack(headData)
if err != nil {
fmt.Println("unpack msg err ", err)
break
}
//根据dataLen,再次读取Data,放在msg.Data中,
var data []byte
//如果数据包中有数据,则读取
if msg.GetMsgLen() > 0 {
data = make([]byte, msg.GetMsgLen())
//将切片data读满
if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
fmt.Println("read msg data err ", err)
break
}
}
msg.SetData(data)
//封装请求,改为router处理
r := Request{
conn: c,
msg: msg,
}
//判断是否开启workerPool,如果没有开启则直接创建协程处理;如果开启则通过workerPool处理
if util.GlobalObject.WorkerPoolSize > 0 {
c.MsgHandler.SendMsgToTaskQueue(&r)
} else {
go c.MsgHandler.DoMsgHandler(&r)
}
}
}
//启动连接
func (c *Connection) Start() {
fmt.Printf("ConnID %d is Start...", c.ConnID)
//开启读、写
go c.StartReader()
go c.StartWriter()
//执行钩子函数
c.TcpServer.CallOnConnStart(c)
}
//停止连接
func (c *Connection) Stop() {
fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)
if c.isClosed {
return
}
c.isClosed = true
//连接关闭之前执行hook关闭的钩子函数
c.TcpServer.CallOnConnStop(c)
c.Conn.Close()
c.ExitChan <- true
//连接conn关闭时,需要从连接管理模块中移除
c.TcpServer.GetConnMgr().Remove(c)
close(c.msgChannel)
close(c.ExitChan)
}
//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {
return c.Conn
}
//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {
return c.ConnID
}
//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.isClosed {
return errors.New("connection closed\n")
}
//将data进行封包
dp := NewDataPack()
binaryMsg, err := dp.Pack(NewMessage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id=", msgId)
return errors.New("pack error msg")
}
//将数据发送给客户端
if _, err := c.Conn.Write(binaryMsg); err != nil {
fmt.Println("write msg id ", msgId, " error ", err)
return errors.New("conn write err ")
}
return nil
}
zinx/znet/server.go
package znet
import (
"fmt"
"myTest/zinx/util"
"myTest/zinx/ziface"
"net"
)
type Server struct {
Name string
IPVersion string
IP string
Port int
MsgHandler *MsgHandle
ConnMgr *ConnManager
//创建连接之前的Hook函数
OnConnStart func(conn ziface.IConnection)
OnConnStop func(conn ziface.IConnection)
}
func NewServer(name string) *Server {
s := &Server{
Name: name,
IPVersion: "tcp4",
IP: util.GlobalObject.Host,
Port: util.GlobalObject.TcpPort,
MsgHandler: NewMsgHandle(),
ConnMgr: NewConnManager(),
}
return s
}
func (s *Server) Start() {
//启动服务监听端口
fmt.Printf("[Zinx] Server Name :%s , listen IP :%v , Port: %d is starting \n", s.Name, s.IP, s.Port)
fmt.Printf("[Zinx] Version :%s , MaxConn:%v , MaxPackageSize: %d \n", util.GlobalObject.Version, util.GlobalObject.MaxConn, util.GlobalObject.MaxPackageSize)
var cid uint32 = 0
go func() {
//0 开启消息队列及Worker工作池
s.MsgHandler.StartWorkerPool()
addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
if err != nil {
fmt.Printf("resolve tcp addr error %v\n", err)
return
}
listener, err := net.ListenTCP(s.IPVersion, addr)
if err != nil {
fmt.Println("listen ", s.IPVersion, " err ", err)
return
}
fmt.Println("[start] Zinx server success ", s.Name, "Listening...")
//阻塞连接,处理业务
for {
conn, err := listener.AcceptTCP()
if err != nil {
fmt.Println("Accept err ", err)
continue
}
//判断当前连接数是否超过最大连接数,如果超过则关闭新创建的连接
if s.ConnMgr.Len() >= util.GlobalObject.MaxConn {
//TODO 给客户端返回一个超出最大连接的错误包
fmt.Println("-----------------》 Tcp Conn exceed, conn num=", util.GlobalObject.MaxConn)
conn.Close()
//关闭当前连接,等待下一次连接【如果当前连接数小于最大连接数】
continue
}
dealConn := NewConnection(s, conn, cid, s.MsgHandler)
cid++
//开启goroutine处理启动当前conn
go dealConn.Start()
}
}()
}
func (s *Server) Stop() {
//释放相关资源
fmt.Println("[STOP] Zinx server name ", s.Name)
s.ConnMgr.ClearConn()
}
func (s *Server) Serve() {
s.Start()
//阻塞,一直读取客户端所发送过来的消息
select {}
}
func (s *Server) GetConnMgr() ziface.IConnManager {
return s.ConnMgr
}
func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {
s.MsgHandler.AddRouter(msgId, router)
}
//注册创OnConnStart钩子函数
func (s *Server) SetOnConnStart(hookFunc func(conn ziface.IConnection)) {
s.OnConnStart = hookFunc
}
func (s *Server) SetOnConnStop(hookFunc func(conn ziface.IConnection)) {
s.OnConnStop = hookFunc
}
//调用OnConnStart钩子函数
func (s *Server) CallOnConnStart(conn ziface.IConnection) {
if s.OnConnStart != nil {
fmt.Println("---------> call OnConnStart()")
s.OnConnStart(conn)
}
}
func (s *Server) CallOnConnStop(conn ziface.IConnection) {
if s.OnConnStop != nil {
fmt.Println("----------> call OnConnStop()")
s.OnConnStop(conn)
}
}
测试
myDemo/ZinxV0.9/Server.go
package main
import (
"fmt"
"myTest/zinx/ziface"
"myTest/zinx/znet"
)
//自定义一个Router,测试路由功能
type PingRouter struct {
znet.BaseRouter
}
func (pr *PingRouter) Handler(request ziface.IRequest) {
fmt.Println("call router handler...")
//先读取客户端数据,再回写ping...ping...ping...
fmt.Println("receive from client msgId=", request.GetMsgID(),
"data=", string(request.GetData()))
//回写ping
err := request.GetConnection().SendMsg(0, []byte("ping...ping...ping..."))
if err != nil {
fmt.Println(err)
}
}
//定义第二个Router
type HelloRouter struct {
znet.BaseRouter
}
func (hr *HelloRouter) Handler(request ziface.IRequest) {
fmt.Println("receive from client msgId=", request.GetMsgID(),
"data=", string(request.GetData()))
err := request.GetConnection().SendMsg(1, []byte("hello zinx, I'm the other handler"))
if err != nil {
fmt.Println(err)
}
}
//连接创建成功之后需要执行的逻辑
func DoConnBegin(conn ziface.IConnection) {
fmt.Println("=====>Do Conn Begin...")
if err := conn.SendMsg(202, []byte("do connection begin...")); err != nil {
fmt.Println("err")
}
}
//连接断开之前要执行的逻辑
func DoConnLost(conn ziface.IConnection) {
fmt.Println("=====>Do Conn Lost...")
fmt.Println("connID=", conn.GetConnectionID(), " is Lost....")
}
func main() {
s := znet.NewServer("[Zinx v0.9]")
//添加自定义路由(PingRouter和HelloRouter)
router0 := &PingRouter{}
s.AddRouter(0, router0)
router1 := &HelloRouter{}
s.AddRouter(1, router1)
//注册hook钩子函数
s.SetOnConnStart(DoConnBegin)
s.SetOnConnStop(DoConnLost)
s.Serve()
}
测试代码中的myDemo/ZinxV0.9/Client.go和myDemo/ZinxV0.8/Client.go一样。
- 为了方便测试超过最大连接数的报错信息,我们可以修改配置文件
//将最大连接数设置为2,然后我们复制Client.go,可以多起几个Client来进行测试
{
"Name": "Zinx Server Application",
"Version": "V0.9",
"Host": "0.0.0.0",
"TcpPort": 8092,
"MaxConn": 2,
"MaxPackageSize": 1024,
"WorkerPoolSize": 10
}
测试最大连接数与连接管理:
测试钩子函数:
4 添加连接属性并测试【v0.10】
通过map[string]interface{}来存储连接的属性值,通过RWLock来保证读写connection属性值安全
- 设置连接属性
- 获取连接属性
- 移除连接属性
zinx/ziface/iconnection.go
package ziface
import "net"
type IConnection interface {
//启动连接
Start()
//停止连接
Stop()
//获取当前连接的Conn对象
GetTCPConnection() *net.TCPConn
//获取当前连接模块的id
GetConnectionID() uint32
//获取远程客户端的TCP状态 IP:Port
RemoteAddr() net.Addr
//发送数据
SendMsg(msgId uint32, data []byte) error
SetProperty(key string, value interface{})
GetProperty(key string) (interface{}, error)
RemoveProperty(key string)
}
//定义一个处理连接业务的方法
type HandleFunc func(*net.TCPConn, []byte, int) error
zinx/znet/connection.go
package znet
import (
"fmt"
"github.com/kataras/iris/v12/x/errors"
"io"
"myTest/zinx/util"
"myTest/zinx/ziface"
"net"
"sync"
)
type Connection struct {
Conn *net.TCPConn
ConnID uint32
isClosed bool
msgChannel chan []byte
//告知当前的连接已经退出/停止(由Reader告知writer退出)
ExitChan chan bool
MsgHandler *MsgHandle
TcpServer ziface.IServer
property map[string]interface{}
propertyLock sync.RWMutex
}
func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {
c := &Connection{
Conn: conn,
ConnID: connID,
MsgHandler: msgHandle,
isClosed: false,
msgChannel: make(chan []byte),
ExitChan: make(chan bool, 1),
TcpServer: server,
property: make(map[string]interface{}),
}
//将conn添加到connMgr中
c.TcpServer.GetConnMgr().Add(c)
return c
}
func (c *Connection) StartWriter() {
fmt.Println("[Writer Goroutine is running]")
defer fmt.Println("[conn Writer goroutine exit!]", c.RemoteAddr().String())
//不断的阻塞等待channel的消息,然后将channel中的消息写给客户端
for {
select {
case data := <-c.msgChannel:
//有数据写给客户端
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send data error , ", err)
return
}
case <-c.ExitChan:
//代表reader已经退出,此时writer也需要退出
return
}
}
}
func (c *Connection) StartReader() {
fmt.Println("reader goroutine is running...")
defer fmt.Println("[Reader goroutine is exit] connID=", c.ConnID, " remote addr is ", c.RemoteAddr().String())
defer c.Stop()
//读取数据
for {
//创建一个拆包对象
dp := NewDataPack()
//读取客户端的msg Head 二进制流 8字节
headData := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
fmt.Println("read msg head err ", err)
break
}
//拆包,将读取到的headData封装为msg
msg, err := dp.UnPack(headData)
if err != nil {
fmt.Println("unpack msg err ", err)
break
}
//根据dataLen,再次读取Data,放在msg.Data中,
var data []byte
//如果数据包中有数据,则读取
if msg.GetMsgLen() > 0 {
data = make([]byte, msg.GetMsgLen())
//将切片data读满
if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
fmt.Println("read msg data err ", err)
break
}
}
msg.SetData(data)
//封装请求,改为router处理
r := Request{
conn: c,
msg: msg,
}
//判断是否开启workerPool,如果没有开启则直接创建协程处理;如果开启则通过workerPool处理
if util.GlobalObject.WorkerPoolSize > 0 {
c.MsgHandler.SendMsgToTaskQueue(&r)
} else {
go c.MsgHandler.DoMsgHandler(&r)
}
}
}
//启动连接
func (c *Connection) Start() {
fmt.Printf("ConnID %d is Start...", c.ConnID)
//开启读、写
go c.StartReader()
go c.StartWriter()
//执行钩子函数
c.TcpServer.CallOnConnStart(c)
}
//停止连接
func (c *Connection) Stop() {
fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)
if c.isClosed {
return
}
c.isClosed = true
//连接关闭之前执行hook关闭的钩子函数
c.TcpServer.CallOnConnStop(c)
c.Conn.Close()
c.ExitChan <- true
//连接conn关闭时,需要从连接管理模块中移除
c.TcpServer.GetConnMgr().Remove(c)
close(c.msgChannel)
close(c.ExitChan)
}
//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {
return c.Conn
}
//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {
return c.ConnID
}
//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.isClosed {
return errors.New("connection closed\n")
}
//将data进行封包
dp := NewDataPack()
binaryMsg, err := dp.Pack(NewMessage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id=", msgId)
return errors.New("pack error msg")
}
//将数据发送给客户端
if _, err := c.Conn.Write(binaryMsg); err != nil {
fmt.Println("write msg id ", msgId, " error ", err)
return errors.New("conn write err ")
}
return nil
}
func (c *Connection) SetProperty(key string, value interface{}) {
c.propertyLock.Lock()
defer c.propertyLock.Unlock()
c.property[key] = value
}
func (c *Connection) GetProperty(key string) (interface{}, error) {
c.propertyLock.RLock()
defer c.propertyLock.RUnlock()
if value, ok := c.property[key]; ok {
return value, nil
} else {
return nil, errors.New("no property found")
}
}
func (c *Connection) RemoveProperty(key string) {
c.propertyLock.Lock()
defer c.propertyLock.Unlock()
delete(c.property, key)
}
测试
myDemo/ZinxV0.10/Server.go
package main
import (
"fmt"
"myTest/zinx/ziface"
"myTest/zinx/znet"
)
//自定义一个Router,测试路由功能
type PingRouter struct {
znet.BaseRouter
}
func (pr *PingRouter) Handler(request ziface.IRequest) {
fmt.Println("call router handler...")
//先读取客户端数据,再回写ping...ping...ping...
fmt.Println("receive from client msgId=", request.GetMsgID(),
"data=", string(request.GetData()))
//回写ping
err := request.GetConnection().SendMsg(0, []byte("ping...ping...ping..."))
if err != nil {
fmt.Println(err)
}
}
//定义第二个Router
type HelloRouter struct {
znet.BaseRouter
}
func (hr *HelloRouter) Handler(request ziface.IRequest) {
fmt.Println("receive from client msgId=", request.GetMsgID(),
"data=", string(request.GetData()))
err := request.GetConnection().SendMsg(1, []byte("hello zinx, I'm the other handler"))
if err != nil {
fmt.Println(err)
}
}
//连接创建成功之后需要执行的逻辑
func DoConnBegin(conn ziface.IConnection) {
fmt.Println("=====>Do Conn Begin...")
if err := conn.SendMsg(202, []byte("do connection begin...")); err != nil {
fmt.Println("err")
}
//给conn设置属性
conn.SetProperty("Name", "ziyi")
conn.SetProperty("士兵突击", "https://www.bilibili.com/video/BV1Lk4y1N7tC/")
}
//连接断开之前要执行的逻辑
func DoConnLost(conn ziface.IConnection) {
fmt.Println("=====>Do Conn Lost...")
fmt.Println("connID=", conn.GetConnectionID(), " is Lost....")
//读取属性
property, _ := conn.GetProperty("Name")
fmt.Println("Get Property Name=", property)
property, _ = conn.GetProperty("士兵突击")
fmt.Println("Get Property 士兵突击=", property)
}
func main() {
s := znet.NewServer("[Zinx v0.10]")
//添加自定义路由(PingRouter和HelloRouter)
router0 := &PingRouter{}
s.AddRouter(0, router0)
router1 := &HelloRouter{}
s.AddRouter(1, router1)
//注册hook钩子函数
s.SetOnConnStart(DoConnBegin)
s.SetOnConnStop(DoConnLost)
s.Serve()
}
myDemo/ZinxV0.10/Client.go
package main
import (
"fmt"
"io"
"myTest/zinx/znet"
"net"
"time"
)
/*
模拟客户端
*/
func main() {
fmt.Println("client start...")
time.Sleep(time.Second * 1)
//1 创建服务器连接
conn, err := net.Dial("tcp", "127.0.0.1:8092")
if err != nil {
fmt.Println("client start err ", err)
return
}
for {
//发送封装后的数据包
dp := znet.NewDataPack()
binaryMsg, err := dp.Pack(znet.NewMessage(0, []byte("Zinx client0 test msg")))
if err != nil {
fmt.Println("client pack msg err ", err)
return
}
if _, err := conn.Write(binaryMsg); err != nil {
fmt.Println("client write err ", err)
return
}
//服务器应该给我们回复一个message数据,msgId为1,内容为ping...ping...
//1 先读取流中的head部分,得到Id和dataLen
binaryHead := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(conn, binaryHead); err != nil {
fmt.Println("client read head err ", err)
break
}
//将二进制的head拆包到msg中
msgHead, err := dp.UnPack(binaryHead)
if err != nil {
fmt.Println("client unpack msgHead err ", err)
break
}
if msgHead.GetMsgLen() > 0 {
//2 有数据, 再根据dataLen进行二次读取,将data读出来
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetMsgLen())
if _, err := io.ReadFull(conn, msg.Data); err != nil {
fmt.Println("read msg data error ", err)
return
}
fmt.Println("--------> Receive Server msg , ID=", msg.Id, " ,len=", msg.DataLen, " ,data=", string(msg.Data))
}
//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片
time.Sleep(time.Second * 1)
}
}