Zinx - V0.9 连接管理
- 每个服务器的能够处理的最大IO数量是有限的,根据当前服务器能开辟的IO数量决定,最终决定权是内存大小
- 现在我们要为Zinx框架增加链接个数的限定,如果超过⼀定量的客户端个数,Zinx为了保证后端的及时响应,⽽拒绝链接请求
实现思路
连接管理模块
- iconnmanager.go
package ziface
//连接管理模块抽象层
type IConnManager interface {
//添加链接
Add(conn IConneciton)
//删除连接
Remove(conn IConneciton)
//根据connID获取链接
Get(connID uint32) (IConneciton, error)
//得到当前连接总数
Len() int
//清除并终止所有d连接
ClearConn()
}
- connmanager.go
package znet
import (
"errors"
"fmt"
"sync"
"zinx/ziface"
)
//连接管理模块
type ConnManager struct {
connections map[uint32] ziface.IConneciton //管理的连接集合
connLock sync.RWMutex //保护连接集合的读写锁
}
//创建当前连接的方法
func NewConnManager() *ConnManager {
return &ConnManager{
connections:make(map[uint32] ziface.IConneciton),
}
}
//添加链接
func (connMgr *ConnManager) Add(conn ziface.IConneciton) {
//保护共享资源map, 加写锁
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
//将conn加入到ConnManager中
connMgr.connections[conn.GetConnID()] = conn
fmt.Println("connID = ", conn.GetConnID()," add to ConnManager successfully: conn num = ", connMgr.Len())
}
//删除连接
func (connMgr *ConnManager) Remove(conn ziface.IConneciton) {
//保护共享资源map, 加写锁
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
//删除连接信息
delete(connMgr.connections, conn.GetConnID())
fmt.Println("connID = ", conn.GetConnID()," remove from ConnManager successfully: conn num = ", connMgr.Len())
}
//根据connID获取链接
func (connMgr *ConnManager) Get(connID uint32) (ziface.IConneciton, error) {
//保护共享资源map, 加读锁
connMgr.connLock.RLock()
defer connMgr.connLock.RUnlock()
if conn, ok := connMgr.connections[connID]; ok {
//找到了
return conn, nil
} else {
return nil, errors.New("connection not FOUND!")
}
}
//得到当前连接总数
func (connMgr *ConnManager) Len() int {
return len(connMgr.connections)
}
//清除并终止所有的连接
func (connMgr *ConnManager) ClearConn() {
//保护共享资源map, 加写锁
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
//删除conn并停止conn的工作
for connID, conn := range connMgr.connections {
//停止
conn.Stop()
//删除
delete(connMgr.connections, connID)
}
fmt.Println("Clear All connections succ! conn num = ", connMgr.Len())
}
ConnManager 中,其中⽤⼀个map来承载全部的连接信息,key是连接ID,value则是连接本身。其中有⼀个读写锁 connLock 主要是针对map做多任务修改时的保护作⽤。Remove() ⽅法只是单纯的将conn从map中摘掉,⽽ ClearConn() ⽅法则会先停⽌链接业务, c.Stop() ,然后再从map中摘除
Zinx框架集成连接管理模块
- Server集成ConnManager
//iServer的接口实现,定义一个Server的服务器模块
type Server struct {
//服务器的名称
Name string
//服务器绑定的ip版本
IPVersion string
//服务器监听的IP
IP string
//服务器监听的端口
Port int
//当前server的消息管理模块, 用来绑定MsgID和对应的处理业务API关系
MsgHandler ziface.IMsgHandle
//该server的连接管理器
ConnMgr ziface.IConnManager
}
//初始化Server模块的方法
func NewServer(name string) ziface.IServer {
s := &Server{
Name: utils.GlobalObject.Name,
IPVersion: "tcp4",
IP: utils.GlobalObject.Host,
Port: utils.GlobalObject.TcpPort,
MsgHandler: NewMsgHandle(),
ConnMgr: NewConnManager(),
}
return s
}
//启动服务器
func (s *Server) Start() {
...
//3 阻塞的等待客户端链接,处理客户端链接业务(读写)
for {
//如果有客户端链接过来,阻塞会返回
conn, err := listenner.AcceptTCP()
if err != nil {
fmt.Println("Accept err", err)
continue
}
//设置最大连接个数的判断,如果超过最大连接,那么则关闭此新的连接
if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn {
//TODO 给客户端相应一个超出最大连接的错误包
fmt.Println("====> Too Many Connections MaxConn = ", utils.GlobalObject.MaxConn)
conn.Close()
continue
}
// 将处理新连接的业务方法 和 conn 进行绑定 得到我们的链接模块
dealConn := NewConnection(s, conn, cid, s.MsgHandler)
cid++
// 启动当前的链接业务处理
go dealConn.Start()
}
}()
}
//停止服务器
func (s *Server) Stop() {
//将一些服务器的资源、状态或者一些已经开辟的链接信息 进行停止或者回收
fmt.Println("[STOP] Zinx server name ", s.Name)
s.ConnMgr.ClearConn()
}
func (s *Server) GetConnMgr() ziface.IConnManager {
return s.ConnMgr
}
- Connection集成ConnManager
//链接模块
type Connection struct {
//当前Conn隶属于哪个Server
TcpServer ziface.IServer
//当前链接的socket TCP套接字
Conn *net.TCPConn
//链接的ID
ConnID uint32
//当前的链接状态
isClosed bool
//告知当前链接已经退出的/停止 channel(由Reader告知Writer退出)
ExitChan chan bool
//无缓冲d管道,用于读、写Goroutine之间的消息通信
msgChan chan []byte
//消息的管理MsgID 和对应的处理业务API关系
MsgHandler ziface.IMsgHandle
}
//初始化链接模块的方法
func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
c := &Connection{
TcpServer: server,
Conn: conn,
ConnID: connID,
MsgHandler: msgHandler,
isClosed: false,
msgChan: make(chan []byte),
ExitChan: make(chan bool, 1),
}
//将conn加入到ConnMananger中
c.TcpServer.GetConnMgr().Add(c)
return c
}
//停止链接 结束当前链接的工作
func (c *Connection) Stop() {
fmt.Println("Conn Stop().. ConnID = ", c.ConnID)
//如果当前链接已经关闭
if c.isClosed == true {
return
}
c.isClosed = true
//关闭socket链接
c.Conn.Close()
//告知Writer关闭
c.ExitChan <- true
//将当前连接从ConnMgr中摘除掉
c.TcpServer.GetConnMgr().Remove(c)
//回收资源
close(c.ExitChan)
close(c.msgChan)
}
需要注意的是,在将conn加入到ConnManager这一步时,因为Connection结构体中没有定义connManager属性,所以我们的Connection模块并不能够所引到ConnManager,因此为了能够所引到ConnManager并且拥有良好的扩展性,我们让Connection和Server做一个关联,给Connection结构体添加一个parent属性,使得Connection隶属于某个Server,并在初始化Connection时将parent属性当做形参传入,这样就可以通过GetConnMgr将conn加入到ConnManager。因为Connection整体业务与连接管理器没什么关系,只是对连接的增删操作,所以当连接关闭的时候我们需要将连接从连接管理器中删除
自定义连接前后的业务
- 需求分析
有的时候,在创建链接的时候,希望在创建链接之后、和断开链接之前,执⾏⼀些⽤户⾃定义的业务。那么我们就需要给Zinx增添两个链接创建后和断开前时机的回调函数,⼀般也称作Hook(钩⼦)函数
- iserver.go
package ziface
//定义一个服务器接口
type IServer interface {
//启动服务器
Start()
//停止服务器
Stop()
//运行服务器
Serve()
//路由功能:给当前的服务注册一个路由方法,供客户端的链接处理使用
AddRouter(uint32, IRouter)
//获取当前server 的连接管理器
GetConnMgr() IConnManager
//注册OnConnStart 钩子函数的方法
SetOnConnStart(func(conneciton IConneciton))
//注册OnConnStop钩子函数的方法
SetOnConnStop(func(conneciton IConneciton))
//调用OnConnStart钩子函数的方法
CallOnConnStart(conneciton IConneciton)
//调用OnConnStop钩子函数的方法
CallOnConnStop(conneciton IConneciton)
}
- server.go
//iServer的接口实现,定义一个Server的服务器模块
type Server struct {
//服务器的名称
Name string
//服务器绑定的ip版本
IPVersion string
//服务器监听的IP
IP string
//服务器监听的端口
Port int
//当前server的消息管理模块, 用来绑定MsgID和对应的处理业务API关系
MsgHandler ziface.IMsgHandle
//该server的连接管理器
ConnMgr ziface.IConnManager
//该Server创建链接之后自动调用Hook函数--OnConnStart
OnConnStart func(conn ziface.IConneciton)
//该Server销毁链接之前自动调用的Hook函数--OnConnStop
OnConnStop func(conn ziface.IConneciton)
}
//注册OnConnStart 钩子函数的方法
func (s *Server) SetOnConnStart(hookFunc func(conneciton ziface.IConneciton)) {
s.OnConnStart = hookFunc
}
//注册OnConnStop钩子函数的方法
func (s *Server) SetOnConnStop(hookFunc func(conneciton ziface.IConneciton)) {
s.OnConnStop = hookFunc
}
//调用OnConnStart钩子函数的方法
func (s *Server) CallOnConnStart(conn ziface.IConneciton) {
if s.OnConnStart != nil {
fmt.Println("----> Call OnConnStart() ...")
s.OnConnStart(conn)
}
}
//调用OnConnStop钩子函数的方法
func (s *Server) CallOnConnStop(conn ziface.IConneciton) {
if s.OnConnStop != nil {
fmt.Println("---> Call OnConnStop() ...")
s.OnConnStop(conn)
}
}
-
选定start和stop的Hook方法调用位置 - connection.go
//启动链接 让当前的链接准备开始工作
func (c *Connection) Start() {
fmt.Println("Conn Start() ... ConnID = ", c.ConnID)
//启动从当前链接的读数据的业务
go c.StartReader()
//启动从当前链接写数据的业务
go c.StartWriter()
//按照开发者传递进来的 创建链接之后需要调用的处理业务,执行对应Hook函数
c.TcpServer.CallOnConnStart(c)
}
//停止链接 结束当前链接的工作
func (c *Connection) Stop() {
fmt.Println("Conn Stop().. ConnID = ", c.ConnID)
//如果当前链接已经关闭
if c.isClosed == true {
return
}
c.isClosed = true
//调用开发者注册的 销毁链接之前 需要执行的业务Hook函数
c.TcpServer.CallOnConnStop(c)
//关闭socket链接
c.Conn.Close()
//告知Writer关闭
c.ExitChan <- true
//将当前连接从ConnMgr中摘除掉
c.TcpServer.GetConnMgr().Remove(c)
//回收资源
close(c.ExitChan)
close(c.msgChan)
}
Zinx框架开发
...
//创建链接之后执行钩子函数
func DoConnectionBegin(conn ziface.IConneciton) {
fmt.Println("===> DoConnectionBegin is Called ... ")
if err := conn.SendMsg(202, []byte("DoConnection BEGIN")); err != nil {
fmt.Println(err)
}
}
//链接断开之前的需要执行的函数
func DoConnectionLost(conn ziface.IConneciton) {
fmt.Println("===> DoConnectionLost is Called ...")
fmt.Println("conn ID = ", conn.GetConnID(), " is Lost...")
}
func main() {
//1 创建一个server句柄,使用Zinx的api
s := znet.NewServer("[zinx V0.9]")
//2 注册链接Hook钩子函数
s.SetOnConnStart(DoConnectionBegin)
s.SetOnConnStop(DoConnectionLost)
//3 给当前zinx框架添加自定义的router
s.AddRouter(0, &PingRouter{})
s.AddRouter(1, &HelloZinxRouter{})
//4 启动server
s.Serve()
}
需要注意的是,DoConnectionBegin函数的名称实际上就是函数的地址,Server结构体中OnConnStart函数指针就指向DoConnectionBegin函数,我们在Server的CallOnConnStart方法中一调用s.OnConnStart方法就会执行开发者中写好的DoConnectionBegin业务