【源码阅读】Golang中的go-sql-driver库源码探究

news2024/10/5 17:03:29

文章目录

    • 前言
    • 一、go-sql-driver/mysql
      • 1、驱动注册:sql.Register
      • 2、驱动实现:MysqlDriver
      • 3、RegisterDialContext
    • 二、总结

前言

在上篇文章中我们知道,database/sql只是提供了驱动相关的接口,并没有相关的具体实现,具体内容是由第三方实现的,如go-sql-driver/mysql:https://github.com/go-sql-driver/mysql/,本章中我们主要是探究这个驱动实现库的具体实现。以及它是如何与database/sql一起作用的。

一、go-sql-driver/mysql

go-sql-driver作为一个三方驱动库,主要就是实现database/sql中的驱动接口了,因此,主要的文件也就是driver.go、connector.go和connection.go几个文件了。因此,本章的阅读业主要聚焦与这三个文件中的源码内容。
在这里插入图片描述

1、驱动注册:sql.Register

通常,我们都会这样调用database/sql的Open方法创建一个db实例:

import (
	"database/sql"

	_ "github.com/go-sql-driver/mysql"
)

// ...

db, err := sql.Open("mysql", "user:password@/dbname")
if err != nil {
	panic(err)
}

初看是不是觉得很奇怪,在这段代码中,我们没有直接使用到go-sql-driver的的任何东西,但却需要引入这个包,这是因为,sql.Open方法中,我们知道,会检查获取对应的驱动,而驱动的注册是由第三方驱动实现包调用Register方法完成的。

在go-sql-driver中的driver.go中,我们发现init函数中会调用Register方法注册相应的驱动,这也是上面的代码中为什么需要引入这个包的原因。

func init() {
	if driverName != "" {
		sql.Register(driverName, &MySQLDriver{})
	}
}

2、驱动实现:MysqlDriver

在go-sql-driver中,核心的driver.go中实现了具体的mysql驱动(MysqlDriver)

// Open new Connection.
// See https://github.com/go-sql-driver/mysql#dsn-data-source-name for how
// the DSN string is formatted
func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
	cfg, err := ParseDSN(dsn)
	if err != nil {
		return nil, err
	}
	c := newConnector(cfg)
	return c.Connect(context.Background())
}

在该方法中,首先从数据源dsn中解析出对应的配置,然后再构造对应的连接器,调用连接器的Connect方法与mysql建立连接。

connector实现了driver.Connector接口,其中Connect方法主要是与mysql进行交互,包括:拨号(dial)、认证、利用mysql协议发包与收包处理结果等,

type connector struct {
	cfg               *Config // immutable private copy.
	encodedAttributes string  // Encoded connection attributes.
}

func newConnector(cfg *Config) *connector {
	encodedAttributes := encodeConnectionAttributes(cfg)
	return &connector{
		cfg:               cfg,
		encodedAttributes: encodedAttributes,
	}
}

// Connect implements driver.Connector interface.
// Connect returns a connection to the database.
func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
	var err error

	// Invoke beforeConnect if present, with a copy of the configuration
	cfg := c.cfg
	if c.cfg.beforeConnect != nil {
		cfg = c.cfg.Clone()
		err = c.cfg.beforeConnect(ctx, cfg)
		if err != nil {
			return nil, err
		}
	}

	// New mysqlConn
	mc := &mysqlConn{
		maxAllowedPacket: maxPacketSize,
		maxWriteSize:     maxPacketSize - 1,
		closech:          make(chan struct{}),
		cfg:              cfg,
		connector:        c,
	}
	mc.parseTime = mc.cfg.ParseTime

	// Connect to Server
	dialsLock.RLock()
	dial, ok := dials[mc.cfg.Net]
	dialsLock.RUnlock()
	if ok {
		dctx := ctx
		if mc.cfg.Timeout > 0 {
			var cancel context.CancelFunc
			dctx, cancel = context.WithTimeout(ctx, c.cfg.Timeout)
			defer cancel()
		}
		mc.netConn, err = dial(dctx, mc.cfg.Addr)
	} else {
		nd := net.Dialer{Timeout: mc.cfg.Timeout}
		mc.netConn, err = nd.DialContext(ctx, mc.cfg.Net, mc.cfg.Addr)
	}
	if err != nil {
		return nil, err
	}
	mc.rawConn = mc.netConn

	// Enable TCP Keepalives on TCP connections
	if tc, ok := mc.netConn.(*net.TCPConn); ok {
		if err := tc.SetKeepAlive(true); err != nil {
			c.cfg.Logger.Print(err)
		}
	}

	// Call startWatcher for context support (From Go 1.8)
	mc.startWatcher()
	if err := mc.watchCancel(ctx); err != nil {
		mc.cleanup()
		return nil, err
	}
	defer mc.finish()

	mc.buf = newBuffer(mc.netConn)

	// Set I/O timeouts
	mc.buf.timeout = mc.cfg.ReadTimeout
	mc.writeTimeout = mc.cfg.WriteTimeout

	// Reading Handshake Initialization Packet
	authData, plugin, err := mc.readHandshakePacket()
	if err != nil {
		mc.cleanup()
		return nil, err
	}

	if plugin == "" {
		plugin = defaultAuthPlugin
	}

	// Send Client Authentication Packet
	authResp, err := mc.auth(authData, plugin)
	if err != nil {
		// try the default auth plugin, if using the requested plugin failed
		c.cfg.Logger.Print("could not use requested auth plugin '"+plugin+"': ", err.Error())
		plugin = defaultAuthPlugin
		authResp, err = mc.auth(authData, plugin)
		if err != nil {
			mc.cleanup()
			return nil, err
		}
	}
	if err = mc.writeHandshakeResponsePacket(authResp, plugin); err != nil {
		mc.cleanup()
		return nil, err
	}

	// Handle response to auth packet, switch methods if possible
	if err = mc.handleAuthResult(authData, plugin); err != nil {
		// Authentication failed and MySQL has already closed the connection
		// (https://dev.mysql.com/doc/internals/en/authentication-fails.html).
		// Do not send COM_QUIT, just cleanup and return the error.
		mc.cleanup()
		return nil, err
	}

	if mc.cfg.MaxAllowedPacket > 0 {
		mc.maxAllowedPacket = mc.cfg.MaxAllowedPacket
	} else {
		// Get max allowed packet size
		maxap, err := mc.getSystemVar("max_allowed_packet")
		if err != nil {
			mc.Close()
			return nil, err
		}
		mc.maxAllowedPacket = stringToInt(maxap) - 1
	}
	if mc.maxAllowedPacket < maxPacketSize {
		mc.maxWriteSize = mc.maxAllowedPacket
	}

	// Handle DSN Params
	err = mc.handleParams()
	if err != nil {
		mc.Close()
		return nil, err
	}

	return mc, nil
}

// Driver implements driver.Connector interface.
// Driver returns &MySQLDriver{}.
func (c *connector) Driver() driver.Driver {
	return &MySQLDriver{}
}

同时,我们还注意到,Connect方法中调用了一个startWatcher方法,该方法从watcher通道中接收一个ctx,并对这个ctx进行监听,每次都会调用一个watchCancel方法将ctx传递Watcher,watcher监听到ctx.Done的信号后,将会调用cancel方法,启动清理工作。

func (mc *mysqlConn) startWatcher() {
	watcher := make(chan context.Context, 1)
	mc.watcher = watcher
	finished := make(chan struct{})
	mc.finished = finished
	go func() {
		for {
			var ctx context.Context
			select {
			case ctx = <-watcher:
			case <-mc.closech:
				return
			}

			select {
			case <-ctx.Done():
				mc.cancel(ctx.Err())
			case <-finished:
			case <-mc.closech:
				return
			}
		}
	}()
}

cancel方法将会调用cleanup方法进行连接的清理工作,可以看到在cleanup中调用了conn.Close,将这个物理连接关闭掉。因此,我们在使用QueryContext或者ExecContext时候,如果ctx设置了超时时间,或者主动cancel,那么意味着这个连接将会被断掉。极端情况下,大量连接同时超时,意味着连接都将失效,此时再有新的请求打进来则会重新建立新的连接,会有一定的连接建立开销。由于连接池是database/sql维护的,因此这也只是客户端(或者说mysql sdk)层面的失效,mysql server接收到的sql执行是不会被中断的。

// finish is called when the query has canceled.
func (mc *mysqlConn) cancel(err error) {
	mc.canceled.Set(err)
	mc.cleanup()
}

// Closes the network connection and unsets internal variables. Do not call this
// function after successfully authentication, call Close instead. This function
// is called before auth or on auth failure because MySQL will have already
// closed the network connection.
func (mc *mysqlConn) cleanup() {
	if mc.closed.Swap(true) {
		return
	}

	// Makes cleanup idempotent
	close(mc.closech)
	conn := mc.rawConn
	if conn == nil {
		return
	}
	if err := conn.Close(); err != nil {
		mc.log(err)
	}
	// This function can be called from multiple goroutines.
	// So we can not mc.clearResult() here.
	// Caller should do it if they are in safe goroutine.
}

在实际项目中,为了减少使用层面的超时导致连接失效这种情况,我们也可以对mysql server设置一个wait_timeout时间,并且调用QueryContext/ExecContext的超时时间要小于这个wait_timeout时间,这样则不会由于某业务中有慢查的sql,导致ctx超时,从而频繁触发连接的重新建立。

3、RegisterDialContext

最后我们再看看下这个静态方法:RegisterDialContext,这个方法主要作用就是注册对应的协议的dialFunc,便于在进行数据库连接时候找到真正的地址。

// RegisterDialContext registers a custom dial function. It can then be used by the
// network address mynet(addr), where mynet is the registered new network.
// The current context for the connection and its address is passed to the dial function.
func RegisterDialContext(net string, dial DialContextFunc) {
	dialsLock.Lock()
	defer dialsLock.Unlock()
	if dials == nil {
		dials = make(map[string]DialContextFunc)
	}
	dials[net] = dial
}

// DialContextFunc is a function which can be used to establish the network connection.
// Custom dial functions must be registered with RegisterDialContext
type DialContextFunc func(ctx context.Context, addr string) (net.Conn, error)

二、总结

本篇文章我们看了go-sql-driver的具体实现,整体上来说,go-sql-driver都是实现database/sql的driver.Driver接口,直接对接mysql服务端,支持mysql协议的收发包,在api层面,query/exec两个方法都提供了带ctx的方法,带ctx和不带ctx的api使用差异,一点小小的切换可能导致不断频繁建立连接与关闭连接等,最后我们也根据实际的情况提出解决此问题的方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1638267.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PG数据库结构与oracle比较

1.数据库集簇逻辑结构 数据库集簇概念&#xff1a;一个大的数据库是由若干个小的数据库组成&#xff0c;实现数据的隔离存放&#xff0c;在概念上应该是与mysql一样的 在mysql中可以用show database列出数据库 PG中用\l 数据库对象存放在数据库中&#xff1a; PG中的所有数据…

Mac 上安装多版本的 JDK 且实现 自由切换

背景 当前电脑上已经安装了 jdk8; 现在再安装 jdk17。 期望 完成 jdk17 的安装&#xff0c;并且完成 环境变量 的配置&#xff0c;实现自由切换。 前置补充知识 jdk 的安装路径 可以通过查看以下目录中的内容&#xff0c;确认当前已经安装的 jdk 版本。 cd /Library/Java/Java…

Maven3.9.6下载安装教程

(/≧▽≦)/~┴┴ 嗨~我叫小奥 ✨✨✨ &#x1f440;&#x1f440;&#x1f440; 个人博客&#xff1a;小奥的博客 &#x1f44d;&#x1f44d;&#x1f44d;&#xff1a;个人CSDN ⭐️⭐️⭐️&#xff1a;Github传送门 &#x1f379; 本人24应届生一枚&#xff0c;技术和水平有…

Typescript精进:前端必备的5大技巧(AI写作)

首先&#xff0c;这篇文章是基于笔尖AI写作进行文章创作的&#xff0c;喜欢的宝子&#xff0c;也可以去体验下&#xff0c;解放双手&#xff0c;上班直接摸鱼~ 按照惯例&#xff0c;先介绍下这款笔尖AI写作&#xff0c;宝子也可以直接下滑跳过看正文~ 笔尖Ai写作&#xff1a;…

瑞_23种设计模式_解释器模式

文章目录 1 解释器模式&#xff08;Interpreter Pattern&#xff09;1.1 介绍1.2 概述1.2.1 文法&#xff08;语法&#xff09;规则1.2.2 抽象语法树 1.3 解释器模式的结构1.4 解释器模式的优缺点1.5 解释器模式的使用场景 2 案例一2.1 需求2.2 代码实现 3 案例二3.1 需求3.2 代…

【右一的开发日记】全导航,持续更新...

文章目录 &#x1f4da;前端【跟课笔记】&#x1f407;核心技术&#x1f407;高级技术 &#x1f4da;捣鼓捣鼓&#x1f407;小小案例&#x1f407;喵喵大王立大功&#x1f407;TED自用学习辅助网站&#x1f407;世界top2000计算机科学家可视化大屏&#x1f407;基于CBDB的唐代历…

【Java EE】MyBatis使用注解操作数据库

文章目录 &#x1f340;参数传递&#x1f334;增(Insert)&#x1f338;返回主键 &#x1f343;删(Delete)&#x1f333;改(Update)&#x1f332;查(Select)&#x1f338;起别名&#x1f338;结果映射&#x1f338;开启驼峰命名(推荐) ⭕总结 &#x1f340;参数传递 需求: 查找…

【JavaEE】进程的概念

文章目录 1、什么是进程&#xff08;Process&#xff09;2、PCB1.pid进程的id/标识符2.内存指针3.文件描述符表4、进程调度4.1状态4.2优先级4.3上下文4.4记账信息 1、什么是进程&#xff08;Process&#xff09; 一个程序&#xff0c;运行起来/跑起来&#xff0c;在操作系统中…

Delta lake with Java--利用spark sql操作数据1

今天要解决的问题是如何使用spark sql 建表&#xff0c;插入数据以及查询数据 1、建立一个类叫 DeltaLakeWithSparkSql1&#xff0c;具体代码如下&#xff0c;例子参考Delta Lake Up & Running第3章内容 import org.apache.spark.sql.SaveMode; import org.apache.spark.…

Ollamallama

Olllama 直接下载ollama程序&#xff0c;安装后可在cmd里直接运行大模型&#xff1b; llama 3 meta 开源的最新llama大模型&#xff1b; 下载运行 1 ollama ollama run llama3 2 github 下载仓库&#xff0c;需要linux环境&#xff0c;windows可使用wsl&#xff1b; 接…

面试:Spring(IOC、AOP、事务失效、循环引用、SpringMVC、SpringBoot的自动配置原理、Spring框架常见注解)

目录 一、Spring的单例Bean是否是线程安全的&#xff1f; 二、什么是AOP 1、介绍 &#xff08;1&#xff09;记录操作日志 &#xff08;2&#xff09;实现Spring中的事务 三、spring中事务失效的场景有哪些&#xff1f; 1、异常捕获处理 2、抛出检查异常 3、非public方…

ElasticSearch教程入门到精通——第四部分(基于ELK技术栈elasticsearch 7.x新特性)

ElasticSearch教程入门到精通——第四部分&#xff08;基于ELK技术栈elasticsearch 7.x新特性&#xff09; 1. Elasticsearch进阶1.1 核心概念1.1.1 索引Index1.1.1.1 索引创建原则1.1.1.2 Inverted Index 1.1.2 类型Type1.1.3 文档Document1.1.4 字段Field1.1.5 映射Mapping1.…

【Mac】Mac安装软件常见问题解决办法

前言 刚开始用Mac系统的小伙伴或者在更新系统版本后运行App的朋友会经常碰到弹窗提示「xxx已损坏&#xff0c;无法打开&#xff0c;您应该将它移到废纸篓」、「打不开xxx&#xff0c;因为Apple无法检查其是否包含恶意软件」、「打不开xxx&#xff0c;因为它来自身份不明的开发…

模型训练中的过拟合和欠拟合

基本概念 我们知道&#xff0c;所谓的神经网络其实就是一个复杂的非线性函数&#xff0c;网络越深&#xff0c;这个函数就越复杂&#xff0c;相应的表达能力也就越强&#xff0c;神经网络的训练则是一个拟合的过程。   当模型的复杂度小于真实数据的复杂度&#xff0c;模型表…

保存钉钉群直播回放下载:直播回放下载步骤详解

今天&#xff0c;我们就来拨开云雾&#xff0c;揭开保存钉钉群直播回放的神秘面纱。教会你们如何下载钉钉群直播回放 首先用到的工具我全部打包好了&#xff0c;有需要的自己下载一下 钉钉群直播回放工具下载&#xff1a;https://pan.baidu.com/s/1WVMNGoKcTwR_NDpvFP2O2A?p…

PyQt5新手教程(五万字)

文章目录 PyQt界面开发的两种方式&#xff1a;可视化UI 编程式UI一、PyQt 简介二、PyQt 与 Qt 的蒙娜丽莎三、PyQt 布局管理器3.1、简介3.2、项目实战3.2.0、添加伸缩项 layout.addStretch&#xff1a;控制布局中组件之间的间距。3.2.1、垂直布局管理器 QVBoxLayout&#xff1…

制作一个 rpm 软件包

首发日期 2024-04-30, 以下为原文内容: 本文以 ibrus (艾刷, 胖喵拼音 ibus 接口模块) 为例, 介绍 rpm 软件包的制作过程. 相关文章: 《发布 AUR 软件包 (ArchLinux)》 https://blog.csdn.net/secext2022/article/details/136803790《多种双拼方案的实现》 https://blog.csdn.…

C语言之详细讲解文件操作(抓住文件操作的奥秘)

什么是文件 与普通文件载体不同&#xff0c;文件是以硬盘为载体存储在计算机上的信息集合&#xff0c;文件可以是文本文档、图片、程序等等。文件通常具有点三个字母的文件扩展名&#xff0c;用于指示文件类型&#xff08;例如&#xff0c;图片文件常常以KPEG格式保存并且文件…

区块链 | IPFS:Merkle DAG

&#x1f98a;原文&#xff1a;IPFS: Merkle DAG 数据结构 - 知乎 &#x1f98a;写在前面&#xff1a;本文属于搬运博客&#xff0c;自己留存学习。 1 Merkle DAG 的简介 Merkle DAG 是 IPFS 系统的核心概念之一。虽然 Merkle DAG 并不是由 IPFS 团队发明的&#xff0c;它来自…

Pyspark+关联规则 Kaggle购物篮分析案例

数据集地址&#xff1a;Market Basket Analysis | Kaggle 我的NoteBook地址&#xff1a;pyspark Market Basket Analysis | Kaggle 零售商期望能够利用过去的零售数据在自己的行业中进行探索&#xff0c;并为客户提供有关商品集的建议&#xff0c;这样就能提高客户参与度、改…