目录
- Day5-Part3:Zinx 的连接管理
- 创建连接管理模块
- 将连接管理模块集成到 Zinx 当中
- 将 ConnManager 集成到 Server 当中
- 在 Connection 的工厂函数中将连接添加到 ConnManager
- Server 中连接数量的判断
- 连接的删除
- 补充:连接的带缓冲发包方式
- 补充:注册连接启动/停止时自定义的 hook 方法
Day5-Part3:Zinx 的连接管理
现在我们要为 Zinx 框架增加连接个数的限定,如果与 Server 相连的 Client 超过了一定的个数,Zinx 为了保证后端的及时响应,应拒绝连接请求。
创建连接管理模块
我们分别在 ziface
和 znet
下建立 iconnmanager.go
和 connmanager.go
。
首先在 iconnmanager.go
当中定义接口:
package ziface
type IConnManager interface {
Add(conn IConnection) // 添加连接
Remove(conn IConnection) // 删除连接
Get(connID uint32) (IConnection, error) // 利用 ConnID 获取连接
Len() int // 获取当前连接数量
ClearConn() // 删除并停止所有连接
}
之后我们在 connmanager.go
当中定义连接管理模块 ConnManager,并使其实现 IConnManager 接口:
package znet
import (
"errors"
"fmt"
"sync"
"zinx/ziface"
)
type ConnManager struct {
connections map[uint32]ziface.IConnection // 管理连接的信息
connLock sync.RWMutex // 读写连接的读写锁
}
func NewConnManager() *ConnManager {
return &ConnManager{
connections: make(map[uint32]ziface.IConnection),
}
}
// Add 添加连接
func (connMgr *ConnManager) Add(conn ziface.IConnection) {
// 保护共享资源 Map, 加写锁
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
// 将 conn 连接添加到 ConnManager
connMgr.connections[conn.GetConnID()] = conn
fmt.Println("connection add to ConnManager successfully: conn num = ", connMgr.Len())
}
// Remove 删除连接
func (connMgr *ConnManager) Remove(conn ziface.IConnection) {
// 保护共享资源 map, 加写锁
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
// 删除连接信息
delete(connMgr.connections, conn.GetConnID())
fmt.Println("connection Remove connID = ", conn.GetConnID(), " successfully: conn num = ", connMgr.Len())
}
// Get 利用 ConnID 获取连接
func (connMgr *ConnManager) Get(connID uint32) (ziface.IConnection, error) {
// 保护共享资源 Map, 加读锁
connMgr.connLock.RLock()
defer connMgr.connLock.RUnlock()
if conn, ok := connMgr.connections[connID]; ok {
return conn, nil
} else {
return nil, errors.New("connection not found")
}
}
// Len 获取当前连接个数
func (connMgr *ConnManager) Len() int {
return len(connMgr.connections)
}
// ClearConn 停止并清除当前所有连接
func (connMgr *ConnManager) ClearConn() {
// 保护共享资源 Map, 加写锁
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
// 停止并删除全部的连接信息
for connID, conn := range connMgr.connections {
// 停止
conn.Stop()
// 删除
delete(connMgr.connections, connID)
}
fmt.Println("Clear All Connections successfully: conn num = ", connMgr.Len())
}
在 ConnManager
中,用一个 map 来承载全部的连接信息,key 是 ConnID,它在 Server 的 Start 方法中,调用 NewConnection 之前确定,而 value 就是对连接的封装 Connection 本身。ConnManager 中还定义了一个 connLock,它的作用是对 map 在多任务修改场景下进行并发保护。
Remove()
方法用于删除指定的连接,而ClearConn()
则会首先停止所有的连接,再删除,应当在服务器停止前调用。
将连接管理模块集成到 Zinx 当中
将 ConnManager 集成到 Server 当中
现在我们先将 ConnManager 集成到 Server 当中,作为 Server 的成员:
type Server struct {
Name string // Name 为服务器的名称
IPVersion string // IPVersion: IPv4 or other
IP string // IP: 服务器绑定的 IP 地址
Port int // Port: 服务器绑定的端口
msgHandler ziface.IMsgHandle // 将 Router 替换为 MsgHandler, 绑定 MsgId 与对应的处理方法
ConnMgr ziface.IConnManager // 当前 Server 的连接管理器
}
// NewServer 将创建一个服务器的 Handler
func NewServer() ziface.IServer {
s := &Server{
Name: settings.Conf.Name,
IPVersion: "tcp4",
IP: settings.Conf.Host,
Port: settings.Conf.Port,
msgHandler: NewMsgHandle(),
ConnMgr: NewConnManager(),
}
return s
}
既然 Server 具备了 ConnManager 成员,我们为其设置一个方法,用于获取 ConnManager 对象(通过成员函数来获取类内的成员):
// zinx/ziface/iserver.go
// 定义服务器接口
type IServer interface {
Start() // Start 启动服务器方法
Stop() // Stop 停止服务器方法
Serve() // Serve 开启服务器方法
AddRouter(msgId uint32, router IRouter) // 路由功能: 给当前服务注册一个路由业务方法
GetConnMgr() IConnManager // 得到连接管理器
}
// zinx/znet/server.go
func (s *Server) GetConnMgr() ziface.IConnManager {
return s.ConnMgr
}
我们可能会需要在 Connection 当中使用 ConnManager,毕竟 ConnManager 是对连接进行管理的模块,我们现在需要一种方法来使得 ConnManager 对 Connection 可见,Zinx 教程中采取的做法是将 Server 作为 Connection 成员的一部分,从而使得 Server 和 Connection 成为互相引用的关系,即:为 Connection 添加一个 Server 成员,使得 Connection 知晓自己隶属于哪个 Server 的管辖下:
type Connection struct {
TCPServer ziface.IServer // 标记当前 Conn 属于哪个 Server
Conn *net.TCPConn // 当前连接的 socket TCP 套接字
ConnID uint32 // 当前连接的 ID, 也可称为 SessionID, 全局唯一
isClosed bool // 当前连接的开启/关闭状态
Msghandler ziface.IMsgHandle // 将 Router 替换为消息管理模块
ExitBuffChan chan bool // 告知该连接一经退出/停止的 channel
msgChan chan []byte // 无缓冲 channel, 用于读/写两个 goroutine 之间的消息通信
msgBuffChan chan []byte // 定义 msgBuffChan
}
当然,传递进来的是 Server 的指针,不可能在服务器应用的运行时再复制一个 Server 的对象。可以这样理解,Connection 应该对其所属 Server 的指针具有访问权。
在 Connection 的工厂函数中将连接添加到 ConnManager
在 NewConnection
工厂函数调用时,由于 Connection 对象对其隶属的 Server 可见,直接调用 Server 的 GetConnMgr 方法并使用 ConnManager 的 Add 方法将 Connection 添加即可:
// NewConnection 创建新的连接
func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
c := &Connection{
TCPServer: server,
Conn: conn,
ConnID: connID,
isClosed: false,
Msghandler: msgHandler,
ExitBuffChan: make(chan bool, 1),
msgChan: make(chan []byte), // msgChan 初始化
}
// 将新创建的 Conn 添加到连接管理器中
c.TCPServer.GetConnMgr().Add(c)
return c
}
Server 中连接数量的判断
回顾我们引入连接管理模块的动机,我们建立连接管理模块,是为了控制连接的数量,以避免服务器同时与过多的客户端建立连接。因此,当连接建立之前,我们应该首先判断一下,当前与 Server 连接的 Client 的数量是否达到上限,如果达到了上限,那么就拒绝连接。实现的方法非常简单,在 Server 的 Start 方法下加入一个 if 条件句即可:
func (s *Server) Start() {
fmt.Printf("[START] Server name: %s,listenner at IP: %s, Port %d is starting\n", s.Name, s.IP, s.Port)
go func() {
// ....
// 3 启动server网络连接业务
for {
// 3.1 阻塞等待客户端建立连接请求
conn, err := listenner.AcceptTCP()
if err != nil {
fmt.Println("Accept err ", err)
continue
}
// 3.2 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接
if s.ConnMgr.Len() >= settings.Conf.MaxConn {
conn.Close()
continue
}
//=============
// 3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的
dealConn := NewConntion(s, conn, cid, s.msgHandler)
cid ++
// 3.4 启动当前链接的处理业务
go dealConn.Start()
}
}()
}
当然,我们应该在 yaml 中定义好我们期望服务器连接的最大数目。
还有一点就是,服务器的连接控制或许可以引入 LRU 这类算法,比如长时间没有消息收发的 Client 可以做断连处理,从而接入新的连接。
连接的删除
在连接停止时,我们应该将连接从 ConnManager 当中删除,因此在 Connection 的 Stop 方法中使用 ConnManager 的 Remove:
// Stop 停止连接, 结束当前连接状态
func (c *Connection) Stop() {
fmt.Println("Conn Stop()... ConnID = ", c.ConnID)
// 1. 如果当前连接已经关闭
if c.isClosed == true {
return
}
c.isClosed = true
fmt.Println("Im here")
// 关闭 socket 连接
c.Conn.Close()
// 通知从缓冲队列读数据的业务, 该链接已经关闭
c.ExitBuffChan <- true
// 将连接从管理器中删除
c.TCPServer.GetConnMgr().Remove(c)
// 关闭该链接全部管道
close(c.ExitBuffChan)
close(c.msgBuffChan)
}
在 Server Stop 的时候,应该将连接全部清空,调用 ClearConn:
func (s *Server) Stop() {
fmt.Println("[STOP] Zinx server , name ", s.Name)
// Server.Stop() 将其它需要清理的连接信息或其他信息一并停止或清理
s.ConnMgr.ClearConn()
}
至此我们成功地将连接管理模块集成到了 Zinx 当中。
补充:连接的带缓冲发包方式
这一部分与连接管理的关联不大,但是与 Zinx 的教学文档同时被列在第九节,因此作为补充写在连接管理的下方。
我们之前给 Connection 提供了一个 SendMsg()
方法,SendMsg()
当中,数据将会被发送到无缓冲的 msgChan
当中。如果客户端的连接较多,处理不及时,可能会产生短暂的阻塞,因此我们可以补充一个带有缓冲的消息发送方法,来做一些非阻塞的用户体验。
首先修改 Connection 的接口 IConnection,添加将消息发送到带缓冲区的通道的方法:
type IConnection interface {
Start() // 启动连接
Stop() // 停止连接
GetConnID() uint32 // 获取远程客户端地址信息
GetTCPConnection() *net.TCPConn // 从当前连接获取原始的 socket TCPConn
RemoteAddr() net.Addr // 获取远程客户端地址信息
SendMsg(msgId uint32, data []byte) error // 直接将 Message 数据发给远程的 TCP 客户端
SendBuffMsg(msgId uint32, data []byte) error // 添加带缓冲的发送消息接口
}
之后为 Connection 结构加入带缓冲区的 msgChan,并补充 SendBuffMsg(...)
方法:
type Connection struct {
TCPServer ziface.IServer // 标记当前 Conn 属于哪个 Server
Conn *net.TCPConn // 当前连接的 socket TCP 套接字
ConnID uint32 // 当前连接的 ID, 也可称为 SessionID, 全局唯一
isClosed bool // 当前连接的开启/关闭状态
Msghandler ziface.IMsgHandle // 将 Router 替换为消息管理模块
ExitBuffChan chan bool // 告知该连接一经退出/停止的 channel
msgChan chan []byte // 无缓冲 channel, 用于读/写两个 goroutine 之间的消息通信
msgBuffChan chan []byte // 定义 msgBuffChan
}
// NewConnection 创建新的连接
func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
c := &Connection{
TCPServer: server,
Conn: conn,
ConnID: connID,
isClosed: false,
Msghandler: msgHandler,
ExitBuffChan: make(chan bool, 1),
msgChan: make(chan []byte), // msgChan 初始化
msgBuffChan: make(chan []byte, settings.Conf.MaxMsgChanLen),
}
// 将新创建的 Conn 添加到连接管理器中
c.TCPServer.GetConnMgr().Add(c)
return c
}
func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
if c.isClosed == true {
return errors.New("Connection closed when send buff msg")
}
// 将 data 封包并发送
dp := NewDataPack()
msg, err := dp.Pack(NewMsgPackage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id = ", msgId)
return errors.New("Pack error msg")
}
c.msgBuffChan <- msg
return nil
}
我们在 Writer 中也要对 msgBuffChan 进行数据监控,如果有数据发送到这个 channel,应当将数据写给客户端,因此在 select 中添加一个接收 msgBuffChan 数据的 case 事件:
func (c *Connection) StartWriter() {
fmt.Println("[Writer Goroutine is running]")
defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")
for {
select {
case data := <-c.msgChan:
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send Data error:", err, " Conn Writer exit~")
return
}
case data, ok := <-c.msgBuffChan:
if ok {
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send Buff Data error:", err, " Conn Writer exit")
return
}
} else {
// 注意: 这里刚才写错了, else 的对象是 ok 的 if 语句而不是 err 的 if 语句
fmt.Println("msgBuffChan is Closed")
return
}
case <-c.ExitBuffChan:
// conn 关闭
return
}
}
}
补充:注册连接启动/停止时自定义的 hook 方法
有时在创建连接之后,或是在连接断开之前,我们希望执行一些用户自定义的业务,所以我们为 Zinx 新增两个连接断开前和创建后的回调函数,它们也成为 hook(钩子)函数。
我们通过 Server 来注册 conn 的 hook 方法,首先修改 IServer 接口:
// 定义服务器接口
type IServer interface {
Start() // Start 启动服务器方法
Stop() // Stop 停止服务器方法
Serve() // Serve 开启服务器方法
AddRouter(msgId uint32, router IRouter) // 路由功能: 给当前服务注册一个路由业务方法
GetConnMgr() IConnManager // 得到连接管理器
SetOnConnStart(func(IConnection)) // 设置该 Server 在连接创建时的 hook 函数
SetOnConnStop(func(IConnection)) // 设置该 Server 在连接断开时的 hook 函数
CallOnConnStart(conn IConnection) // 调用连接 onConnStart Hook 函数
CallOnConnStop(conn IConnection) // 调用连接 onConnStop Hook 函数
}
再修改 Server:
type Server struct {
Name string // Name 为服务器的名称
IPVersion string // IPVersion: IPv4 or other
IP string // IP: 服务器绑定的 IP 地址
Port int // Port: 服务器绑定的端口
msgHandler ziface.IMsgHandle // 将 Router 替换为 MsgHandler, 绑定 MsgId 与对应的处理方法
ConnMgr ziface.IConnManager // 当前 Server 的连接管理器
onConnStart func(conn ziface.IConnection) // Server 在连接创建时的 Hook 函数
onConnStop func(conn ziface.IConnection) // Server 在连接删除时的 Hook 函数
}
为 Server 实现 IServer 的方法:
func (s *Server) SetOnConnStart(hookFunc func(ziface.IConnection)) {
s.onConnStart = hookFunc
}
func (s *Server) SetOnConnStop(hookFunc func(ziface.IConnection)) {
s.onConnStop = hookFunc
}
func (s *Server) CallOnConnStart(conn ziface.IConnection) {
if s.onConnStart != nil {
fmt.Println("---> CallOnConnStart ...")
s.onConnStart(conn)
}
}
func (s *Server) CallOnConnStop(conn ziface.IConnection) {
if s.onConnStop != nil {
fmt.Println("---> CallOnConnStop ...")
s.onConnStop(conn)
}
}
在连接创建的后调用 hook 方法:
// Start 实现 IConnection 中的方法, 它启动连接并让当前连接开始工作
func (c *Connection) Start() {
// 开启处理该连接读取到客户端数据之后的业务请求
go c.StartWriter()
go c.StartReader()
c.TCPServer.CallOnConnStart(c)
for {
select {
case <-c.ExitBuffChan:
// 得到退出消息则不再阻塞
return
}
}
}
停止连接前调用 hook 方法:
// Stop 停止连接, 结束当前连接状态
func (c *Connection) Stop() {
fmt.Println("Conn Stop()... ConnID = ", c.ConnID)
// 1. 如果当前连接已经关闭
if c.isClosed == true {
return
}
c.isClosed = true
fmt.Println("Im here")
// Connection Stop() 如果用户注册了该连接的关闭回调业务, 那么应该在此刻显式调用
c.TCPServer.CallOnConnStop(c)
// 关闭 socket 连接
c.Conn.Close()
// 通知从缓冲队列读数据的业务, 该链接已经关闭
c.ExitBuffChan <- true
// 将连接从管理器中删除
c.TCPServer.GetConnMgr().Remove(c)
// 关闭该链接全部管道
close(c.ExitBuffChan)
close(c.msgBuffChan)
}