Zinx - V0.7 读写协程分离
- 之前connection使用StartReader对客户端的数据
- 接下来我们就要对Zinx做⼀个⼩⼩的改变,就是与客户端进修数据交互的Gouroutine由⼀个变成两个,⼀个专⻔负责从客户端读取数据,⼀个专⻔负责向客户端写数据。这么设计有什么好处,当然是⽬的就是⾼内聚,模块的功能单⼀
- Server依然是处理客户端的响应,主要关键的⼏个⽅法是Listen、Accept等。当建⽴与客户端的套接字后,那么就会开启两个Goroutine分别处理读数据业务和写数据业务,读写数据之间的消息通过⼀个Channel传递
实现思路
- connection.go
Connection定义添加channel
type Connection struct {
//当前链接的socket TCP套接字
Conn *net.TCPConn
//链接的ID
ConnID uint32
//当前的链接状态
isClosed bool
//告知当前链接已经退出的/停止 channel(由Reader告知Writer退出)
ExitChan chan bool
//无缓冲d管道,用于读、写Goroutine之间的消息通信
msgChan chan []byte
//消息的管理MsgID 和对应的处理业务API关系
MsgHandler ziface.IMsgHandle
}
初始化链接方法增加channel
//初始化链接模块的方法
func NewConnection(conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
c := &Connection{
Conn: conn,
ConnID: connID,
MsgHandler: msgHandler,
isClosed: false,
msgChan: make(chan []byte),
ExitChan: make(chan bool, 1),
}
return c
}
增加StartWriter方法
//写消息Goroutine, 专门发送给客户端消息的模块
func (c *Connection) StartWriter() {
fmt.Println("[Writer Goroutine is running]")
defer fmt.Println("[conn Writer exit!]", c.RemoteAddr().String())
//不断的阻塞的等待channel的消息,进行写给客户端
for {
select {
case data := <-c.msgChan:
//有数据要写给客户端
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send data error, ", err)
return
}
case <-c.ExitChan:
//代表Reader已经退出,此时Writer也要推出
return
}
}
}
使用for循环不断阻塞地等待channel的消息,回写给客户端,其中退出一定是由Reader告知Writer退出,因为我们能够知道客户端退出的时候一定是Reader有异常
修改Start方法分离读写go程
//启动链接 让当前的链接准备开始工作
func (c *Connection) Start() {
fmt.Println("Conn Start() ... ConnID = ", c.ConnID)
//启动从当前链接的读数据的业务
go c.StartReader()
//启动从当前链接写数据的业务
go c.StartWriter()
}
SendMsg中将数据通过channel发送给客户端
//提供一个SendMsg方法 将我们要发送给客户端的数据,先进行封包,再发送
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.isClosed == true {
return errors.New("Connection closed when send msg")
}
//将data进行封包 MsgDataLen|MsgID|Data
dp := NewDataPack()
//MsgDataLen|MsgID|Data
binaryMsg, err := dp.Pack(NewMsgPackage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id = ", msgId)
return errors.New("Pack error msg")
}
//将数据发送给客户端
c.msgChan <- binaryMsg
return nil
}
Stop方法添加告知Writer关闭
//停止链接 结束当前链接的工作
func (c *Connection) Stop() {
fmt.Println("Conn Stop().. ConnID = ", c.ConnID)
//如果当前链接已经关闭
if c.isClosed == true {
return
}
c.isClosed = true
//关闭socket链接
c.Conn.Close()
//告知Writer关闭
c.ExitChan <- true
//回收资源
close(c.ExitChan)
close(c.msgChan)
}
当Reader退出时向ExitChan写入数据,只有Reader出现错误都会break,然后调用Stop方法,所以我们可以在Stop函数中将这个消息发送给Writer
整体思路
在Connection结构体中加入一个用于读写Goroutine通信的管道,在NewConnection中对管道做初始化处理,在StartReader()中做一些业务处理,读完数据后发送数据调用DoMsgHandler方法,在DoMsgHandler方法中又会调用Handle方法。当服务器端调用Handle方法时会调用SendMsg方法把消息发给客户端,以前是直接回写给客户端,现在是将消息发送给msgChan管道,此时StartWriter()就会得到管道传过来的数据,然后再将其发给客户端。当退出时StartReader检测到客户端退出,因为他会根据连接进行阻塞等待请求,客户端退出后会break跳出for循环,然后调用Stop方法,在Stop方法中关闭连接、告知Writer退出并回收资源,然后StartWriter得到消息就会退出