文章目录
- 前言
- 一、go-sql-driver/mysql
- 1、驱动注册:sql.Register
- 2、驱动实现:MysqlDriver
- 3、RegisterDialContext
- 二、总结
前言
在上篇文章中我们知道,database/sql只是提供了驱动相关的接口,并没有相关的具体实现,具体内容是由第三方实现的,如go-sql-driver/mysql
:https://github.com/go-sql-driver/mysql/,本章中我们主要是探究这个驱动实现库的具体实现。以及它是如何与database/sql一起作用的。
一、go-sql-driver/mysql
go-sql-driver作为一个三方驱动库,主要就是实现database/sql中的驱动接口了,因此,主要的文件也就是driver.go、connector.go和connection.go几个文件了。因此,本章的阅读业主要聚焦与这三个文件中的源码内容。
1、驱动注册:sql.Register
通常,我们都会这样调用database/sql的Open方法创建一个db实例:
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
)
// ...
db, err := sql.Open("mysql", "user:password@/dbname")
if err != nil {
panic(err)
}
初看是不是觉得很奇怪,在这段代码中,我们没有直接使用到go-sql-driver的的任何东西,但却需要引入这个包,这是因为,sql.Open方法中,我们知道,会检查获取对应的驱动,而驱动的注册是由第三方驱动实现包调用Register方法完成的。
在go-sql-driver中的driver.go中,我们发现init函数中会调用Register方法注册相应的驱动,这也是上面的代码中为什么需要引入这个包的原因。
func init() {
if driverName != "" {
sql.Register(driverName, &MySQLDriver{})
}
}
2、驱动实现:MysqlDriver
在go-sql-driver中,核心的driver.go中实现了具体的mysql驱动(MysqlDriver)
// Open new Connection.
// See https://github.com/go-sql-driver/mysql#dsn-data-source-name for how
// the DSN string is formatted
func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
cfg, err := ParseDSN(dsn)
if err != nil {
return nil, err
}
c := newConnector(cfg)
return c.Connect(context.Background())
}
在该方法中,首先从数据源dsn中解析出对应的配置,然后再构造对应的连接器,调用连接器的Connect方法与mysql建立连接。
connector实现了driver.Connector接口,其中Connect方法主要是与mysql进行交互,包括:拨号(dial)、认证、利用mysql协议发包与收包处理结果等,
type connector struct {
cfg *Config // immutable private copy.
encodedAttributes string // Encoded connection attributes.
}
func newConnector(cfg *Config) *connector {
encodedAttributes := encodeConnectionAttributes(cfg)
return &connector{
cfg: cfg,
encodedAttributes: encodedAttributes,
}
}
// Connect implements driver.Connector interface.
// Connect returns a connection to the database.
func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
var err error
// Invoke beforeConnect if present, with a copy of the configuration
cfg := c.cfg
if c.cfg.beforeConnect != nil {
cfg = c.cfg.Clone()
err = c.cfg.beforeConnect(ctx, cfg)
if err != nil {
return nil, err
}
}
// New mysqlConn
mc := &mysqlConn{
maxAllowedPacket: maxPacketSize,
maxWriteSize: maxPacketSize - 1,
closech: make(chan struct{}),
cfg: cfg,
connector: c,
}
mc.parseTime = mc.cfg.ParseTime
// Connect to Server
dialsLock.RLock()
dial, ok := dials[mc.cfg.Net]
dialsLock.RUnlock()
if ok {
dctx := ctx
if mc.cfg.Timeout > 0 {
var cancel context.CancelFunc
dctx, cancel = context.WithTimeout(ctx, c.cfg.Timeout)
defer cancel()
}
mc.netConn, err = dial(dctx, mc.cfg.Addr)
} else {
nd := net.Dialer{Timeout: mc.cfg.Timeout}
mc.netConn, err = nd.DialContext(ctx, mc.cfg.Net, mc.cfg.Addr)
}
if err != nil {
return nil, err
}
mc.rawConn = mc.netConn
// Enable TCP Keepalives on TCP connections
if tc, ok := mc.netConn.(*net.TCPConn); ok {
if err := tc.SetKeepAlive(true); err != nil {
c.cfg.Logger.Print(err)
}
}
// Call startWatcher for context support (From Go 1.8)
mc.startWatcher()
if err := mc.watchCancel(ctx); err != nil {
mc.cleanup()
return nil, err
}
defer mc.finish()
mc.buf = newBuffer(mc.netConn)
// Set I/O timeouts
mc.buf.timeout = mc.cfg.ReadTimeout
mc.writeTimeout = mc.cfg.WriteTimeout
// Reading Handshake Initialization Packet
authData, plugin, err := mc.readHandshakePacket()
if err != nil {
mc.cleanup()
return nil, err
}
if plugin == "" {
plugin = defaultAuthPlugin
}
// Send Client Authentication Packet
authResp, err := mc.auth(authData, plugin)
if err != nil {
// try the default auth plugin, if using the requested plugin failed
c.cfg.Logger.Print("could not use requested auth plugin '"+plugin+"': ", err.Error())
plugin = defaultAuthPlugin
authResp, err = mc.auth(authData, plugin)
if err != nil {
mc.cleanup()
return nil, err
}
}
if err = mc.writeHandshakeResponsePacket(authResp, plugin); err != nil {
mc.cleanup()
return nil, err
}
// Handle response to auth packet, switch methods if possible
if err = mc.handleAuthResult(authData, plugin); err != nil {
// Authentication failed and MySQL has already closed the connection
// (https://dev.mysql.com/doc/internals/en/authentication-fails.html).
// Do not send COM_QUIT, just cleanup and return the error.
mc.cleanup()
return nil, err
}
if mc.cfg.MaxAllowedPacket > 0 {
mc.maxAllowedPacket = mc.cfg.MaxAllowedPacket
} else {
// Get max allowed packet size
maxap, err := mc.getSystemVar("max_allowed_packet")
if err != nil {
mc.Close()
return nil, err
}
mc.maxAllowedPacket = stringToInt(maxap) - 1
}
if mc.maxAllowedPacket < maxPacketSize {
mc.maxWriteSize = mc.maxAllowedPacket
}
// Handle DSN Params
err = mc.handleParams()
if err != nil {
mc.Close()
return nil, err
}
return mc, nil
}
// Driver implements driver.Connector interface.
// Driver returns &MySQLDriver{}.
func (c *connector) Driver() driver.Driver {
return &MySQLDriver{}
}
同时,我们还注意到,Connect方法中调用了一个startWatcher方法,该方法从watcher通道中接收一个ctx,并对这个ctx进行监听,每次都会调用一个watchCancel方法将ctx传递Watcher,watcher监听到ctx.Done的信号后,将会调用cancel方法,启动清理工作。
func (mc *mysqlConn) startWatcher() {
watcher := make(chan context.Context, 1)
mc.watcher = watcher
finished := make(chan struct{})
mc.finished = finished
go func() {
for {
var ctx context.Context
select {
case ctx = <-watcher:
case <-mc.closech:
return
}
select {
case <-ctx.Done():
mc.cancel(ctx.Err())
case <-finished:
case <-mc.closech:
return
}
}
}()
}
cancel方法将会调用cleanup方法进行连接的清理工作,可以看到在cleanup中调用了conn.Close,将这个物理连接关闭掉。因此,我们在使用QueryContext或者ExecContext时候,如果ctx设置了超时时间,或者主动cancel,那么意味着这个连接将会被断掉。极端情况下,大量连接同时超时,意味着连接都将失效,此时再有新的请求打进来则会重新建立新的连接,会有一定的连接建立开销。由于连接池是database/sql维护的,因此这也只是客户端(或者说mysql sdk)层面的失效,mysql server接收到的sql执行是不会被中断的。
// finish is called when the query has canceled.
func (mc *mysqlConn) cancel(err error) {
mc.canceled.Set(err)
mc.cleanup()
}
// Closes the network connection and unsets internal variables. Do not call this
// function after successfully authentication, call Close instead. This function
// is called before auth or on auth failure because MySQL will have already
// closed the network connection.
func (mc *mysqlConn) cleanup() {
if mc.closed.Swap(true) {
return
}
// Makes cleanup idempotent
close(mc.closech)
conn := mc.rawConn
if conn == nil {
return
}
if err := conn.Close(); err != nil {
mc.log(err)
}
// This function can be called from multiple goroutines.
// So we can not mc.clearResult() here.
// Caller should do it if they are in safe goroutine.
}
在实际项目中,为了减少使用层面的超时导致连接失效这种情况,我们也可以对mysql server设置一个wait_timeout时间,并且调用QueryContext/ExecContext的超时时间要小于这个wait_timeout时间,这样则不会由于某业务中有慢查的sql,导致ctx超时,从而频繁触发连接的重新建立。
3、RegisterDialContext
最后我们再看看下这个静态方法:RegisterDialContext,这个方法主要作用就是注册对应的协议的dialFunc,便于在进行数据库连接时候找到真正的地址。
// RegisterDialContext registers a custom dial function. It can then be used by the
// network address mynet(addr), where mynet is the registered new network.
// The current context for the connection and its address is passed to the dial function.
func RegisterDialContext(net string, dial DialContextFunc) {
dialsLock.Lock()
defer dialsLock.Unlock()
if dials == nil {
dials = make(map[string]DialContextFunc)
}
dials[net] = dial
}
// DialContextFunc is a function which can be used to establish the network connection.
// Custom dial functions must be registered with RegisterDialContext
type DialContextFunc func(ctx context.Context, addr string) (net.Conn, error)
二、总结
本篇文章我们看了go-sql-driver的具体实现,整体上来说,go-sql-driver都是实现database/sql的driver.Driver接口,直接对接mysql服务端,支持mysql协议的收发包,在api层面,query/exec两个方法都提供了带ctx的方法,带ctx和不带ctx的api使用差异,一点小小的切换可能导致不断频繁建立连接与关闭连接等,最后我们也根据实际的情况提出解决此问题的方案。