课程总目录
文章目录
- 一、项目介绍
- 1.1 关键技术点
- 1.2 项目背景
- 1.3 连接池功能点介绍
- 1.4 MySQL Server参数介绍
- 1.5 项目功能点设计和技术细节
- 二、MySQL数据库编程
- 三、项目代码逐步实现
- 3.1 连接池单例模式实现
- 3.2 实现加载配置项
- 3.3 连接池的构造函数
- 3.4 实现生产者
- 3.5 实现消费者
- 3.6 实现回收超过最大空闲时间的连接
- 四、连接池压力测试
- 五、完整源码下载
一、项目介绍
1.1 关键技术点
- MySQL数据库编程
- 单例模式
queue
队列容器- C++11多线程编程
- 线程互斥
- 线程同步通信和
unique_lock
- 基于CAS的原子整形
- 智能指针
shared_ptr
lambda
表达式- 生产者-消费者线程模型
1.2 项目背景
为了提高MySQL数据库(基于C/S设计)的访问瓶颈,除了在服务器端增加缓存服务器缓存常用的数据之外(例如redis),还可以增加连接池,来提高 MySQL Server 的访问效率,在高并发情况下,大量的TCP三次握手、MySQL Server 连接认证、MySQL Server 关闭连接回收资源和TCP四次挥手所耗费的性能时间也是很明显的,增加连接池就是为了减少这一部分的性能损耗
在市场上比较流行的连接池包括阿里的druid、c3p0以及apache的dbcp连接池,它们对于短时间内大量 的数据库增删改查操作性能的提升是很明显的,但是它们有一个共同点是全部由Java实现的。
那么本项目就是为了在 C/C++ 项目中,提高 MySQL Server 的访问效率,实现基于C++代码的数据库连接池模块
1.3 连接池功能点介绍
连接池一般包含了数据库连接所用的 ip 地址、port 端口号、用户名和密码以及其它的性能参数,例如初始连接量、最大连接量、最大空闲时间、连接超时时间等,该项目是基于C++语言实现的连接池,主要也是实现以上几个所有连接池都支持的通用基础功能
- 初始连接量(initSize):表示连接池事先会和 MySQL Server 创建
initSize
个数量的connection
连接,当应用发起 MySQL 访问时,不用再创建和 MySQL Server 新的连接,直接从连接池中获取一个可用的连接就可以了,使用完成后,并不去释放connection
,而是把当前connection
再归还到连接池当中 - 最大连接量(maxSize):当并发访问 MySQL Server 的请求增多时,初始连接量
initSize
已经不够使用了,此时会根据新的请求数量去创建更多的连接给应用去使用,但是新创建的连接数量上限是maxSize
,不能无限制的创建连接,因为每一个连接都会占用一个socket资源,一般连接池和服务器程序是部署在一台主机上的,如果连接池占用过多的socket资源,那么服务器就不能接收太多的客户端请求了。当这些连接使用完成后,再次归还到连接池当中来维护 - 最大空闲时间(maxIdleTime):当访问 MySQL 的并发请求多了以后,连接池里面的连接数量会动态增加,上限是
maxSize
个,当这些连接用完会再次归还到连接池当中。如果在指定的maxIdleTime
里面,这些新增加的连接都没有被再次使用过,那么新增加的这些连接资源就要被回收掉,只需要保持初始连接量initSize
个连接就可以了 - 连接超时时间(connectionTimeout):当 MySQL 的并发请求量过大,连接池中的连接数量已经到达最大连接量
maxSize
了,而此时没有空闲的连接可供使用,那么此时应用从连接池获取连接无法成功,会通过阻塞的方式等待获取连接,获取连接的时间如果超过connectionTimeout
时间,那么获取连接失败,无法访问数据库
1.4 MySQL Server参数介绍
show variables like 'max_connections';
该命令可以查看 MySQL Server 所支持的最大连接个数,超过max_connections
数量的连接,MySQL Server 会直接拒绝,所以在使用连接池增加连接数量的时候,MySQL Server 的max_connections
参数也要适当的进行调整,以适配连接池的连接上限
1.5 项目功能点设计和技术细节
项目结构:
ConnectionPool.cpp
和ConnectionPool.h
:连接池的代码实现Connection.cpp
和Connection.h
:数据库操作代码、增删改查代码实现
连接池主要包含了以下功能点:
- 连接池只需要一个实例,所以
ConnectionPool
以懒汉式单例模式进行设计 - 从
ConnectionPool
中可以获取和 MySQL 的Connection
连接 - 空闲的
Connection
连接全部维护在一个线程安全的盛放Connection
连接的queue
队列中,使用线程互斥锁保证队列的线程安全 - 如果
Connection
队列为空,还需要再获取连接,此时需要动态创建连接,上限数量是最大连接量maxSize
- 队列中空闲连接的时间超过最大空闲时间
maxIdleTime
时就要被释放掉,只保留初始连接量initSize
个连接就可以了,这个功能点需要放在独立的线程中去做 - 如果
Connection
队列为空,而此时连接的数量已达上限最大连接量maxSize
,那么等待connectionTimeout
时间(是等待这么多时间的期间获取连接,不是等待这么多时间之后再开始获取连接)如果还获取不到空闲的连接,那么获取连接失败,此处从Connection
队列获取空闲连接,可以使用带超时时间的mutex
互斥锁来实现连接超时时间 - 用户获取的连接用
shared_ptr
智能指针来管理,用lambda
表达式定制连接释放的功能(不真正释放 连接,而是把连接归还到连接池中) - 连接的生产和连接的消费采用生产者-消费者线程模型来设计,使用了线程间的同步通信机制的条件变量和互斥锁
二、MySQL数据库编程
安装MySQL这里就不赘述了,在做项目之前先检查MySQL是否正在运行
创建chat
数据库
create database chat;
use chat;
创建user
表
CREATE TABLE user (
id INT NOT NULL AUTO_INCREMENT,
name VARCHAR(50) NOT NULL,
age INT NOT NULL,
sex ENUM('male', 'female') NOT NULL,
PRIMARY KEY (id)
);
项目结构如图:
由于MySQL安装的64位版本的,库也都是64位版本,我们的项目一定要设置为64位的
接下来配置头文件和库文件:
- 在『C/C++ → \to → 常规 → \to → 附加包含目录』或『VC++目录 → \to → 包含目录』中,填写 mysql.h 头文件的路径
- 把 libmysql.dll 动态链接库(Linux下后缀名是.so库)放在工程目录下
- 『链接器 → \to → 常规 → \to → 附加库目录』,填写 libmysql.lib 的路径
- 『链接器
→
\to
→ 输入
→
\to
→ 附加依赖项』,填写 libmysql.lib 库的名字
接下来,我们来进行MySQL数据库编程的测试
直接上代码,涉及到 public.h、Connection.h、Connection.cpp、main.cpp 五个文件
public.h:
#pragma once
#include <iostream>
#define LOG(str) \
std::cout << __FILE__ << ":"<<__LINE__<<" " \
<< __TIMESTAMP__ << ":"<<str <<std::endl;
Connection.h:
#pragma once
#include <mysql.h>
#include <string>
#include "public.h"
using namespace std;
// 实现MySQL数据库的操作
class Connection
{
public:
// 构造:初始化数据库连接
Connection();
// 析构:释放数据库连接资源
~Connection();
// 连接数据库
bool connect(string ip,
unsigned short port,
string user,
string password,
string dbname);
// 更新操作:insert、delete、update
bool update(string sql);
// 查询操作:select
MYSQL_RES* query(string sql);
private:
MYSQL* _conn; // 表示和MySQLServer的一条连接
};
Connection.cpp:
#include "public.h"
#include "Connection.h"
Connection::Connection()
{
// 初始化数据库连接
_conn = mysql_init(nullptr);
}
Connection::~Connection()
{
// 释放数据库连接资源
if (_conn != nullptr)
mysql_close(_conn);
}
bool Connection::connect(string ip, unsigned short port,
string user, string password, string dbname)
{
// 连接数据库
MYSQL* p = mysql_real_connect(_conn, ip.c_str(), user.c_str(),
password.c_str(), dbname.c_str(), port, nullptr, 0);
return p != nullptr;
}
bool Connection::update(string sql)
{
// 更新操作:insert、delete、update
if (mysql_real_query(_conn, sql.c_str(), (unsigned long)sql.length()))
{
LOG("更新失败:" + sql);
cout<< mysql error(_conn) << endl;
return false;
}
return true;
}
MYSQL_RES* Connection::query(string sql)
{
// 查询操作:select
if (mysql_real_query(_conn, sql.c_str(), (unsigned long)sql.length()))
{
LOG("查询失败:" + sql);
cout<< mysql error(_conn) << endl;
return nullptr;
}
return mysql_use_result(_conn);
}
main.cpp:
#include <iostream>
#include "Connection.h"
using namespace std;
int main()
{
Connection conn;
//string sql = "insert into user(name,age,sex) values('zhang san', 20, 'male');";
char sql[1024] = { 0 };
sprintf(sql, "insert into user(name,age,sex) values('%s','%d','%s');",
"zhang san", 20, "male");
conn.connect("127.0.0.1", 3306, "root", "123456", "chat");
conn.update(sql); //更新sql语句
return 0;
}
可以在数据库中看到插入成功
三、项目代码逐步实现
先来把DBConnectionPool.h
设计好:
// 实现连接池功能模块
class ConnectionPool
{
public:
// 获取连接池对象实例:用静态方法来获取唯一实例
static ConnectionPool* getConnectionPool();
// 给外部提供接口,从连接池中获取一个可用的空闲连接(消费者)
shared_ptr<Connection> getConnection();
private:
// 单例模式:构造函数私有化
ConnectionPool();
// 单例模式:删除拷贝构造和赋值函数
ConnectionPool(const ConnectionPool&) = delete;
ConnectionPool& operator=(const ConnectionPool&) = delete;
// 从配置文件中加载配置项
bool loadConfigFile();
// 生产者线程函数:运行在独立的线程中,专门负责生产新连接
// 写成成员方法,方便访问成员变量
void produceConnectionTask();
// 定时线程函数:扫描超过maxIdleTime的空闲连接,进行连接回收
void scannerConnectionTask();
// 成员变量
string _ip; // mysql的ip地址
unsigned short _port; // mysql的端口号,默认3306
string _username; // mysql登录用户名
string _password; // mysql登录密码
string _dbname; // 连接的数据库名称
int _initSize; // 连接池的初始连接量
int _maxSize; // 连接池的最大连接量
int _maxIdleTime; // 连接池最大空闲时间(秒)
int _connectionTimeout; // 连接池获取连接的超时时间(毫秒)
// 存储mysql连接的队列
queue<Connection*> _connectionQue;
// 维护连接队列的线程安全互斥锁
mutex _queueMutex;
// 记录连接所创建的connection连接的总数量,线程安全的原子类型
atomic_int _connectionCnt;
// 设置条件变量,用于连接生产线程和消费线程的通信
condition_variable cv;
};
3.1 连接池单例模式实现
之前在OOP经典设计模式讲解过单例模式,主要有四点需要注意的:
- 构造函数私有化
- 定义该类的唯一实例
- 通过静态方法返回唯一实例
- 删除拷贝构造和赋值运算符重载
DBConnectionPool.cpp
中实现getConnectionPool
// 线程安全的懒汉单例模式的函数接口
ConnectionPool* ConnectionPool::getConnectionPool()
{
// !!!函数内的静态局部变量天然线程安全,编译器会自动lock/unlock
static ConnectionPool pool;
return &pool;
}
3.2 实现加载配置项
源文件中添加添加mysql.ini
# 数据库连接池的配置文件
ip = 127.0.0.1
port = 3306
username = root
password = 123456
dbname = chat
initSize = 10
maxSize = 1024
# 最大空闲时间默认单位是秒
maxIdleTime = 60
#连接超时时间单位是毫秒
connectionTimeOut = 100
DBConnectionPool.cpp
中实现loadConfigFile
// 从配置文件中加载配置项
bool ConnectionPool::loadConfigFile()
{
ifstream file("mysql.ini"); // 对象生命周期结束时自动关闭文件
if (!file.is_open())
{
LOG("mysql.ini file is not exist!");
return false;
}
string line;
while (getline(file, line))
{
// 去掉行首的空白字符
line.erase(0, line.find_first_not_of(" \t\n\r\f\v"));
// 跳过注释行和空行
if (line.empty() || line[0] == '#')
continue;
istringstream iss(line);
string key, value;
// '='之前的读给key,'='之后的读给value
if (getline(iss, key, '=') && getline(iss, value))
{
// 去掉key和value两端的空白字符
key.erase(0, key.find_first_not_of(" \t\n\r\f\v"));
key.erase(key.find_last_not_of(" \t\n\r\f\v") + 1);
value.erase(0, value.find_first_not_of(" \t\n\r\f\v"));
value.erase(value.find_last_not_of(" \t\n\r\f\v") + 1);
if (key == "ip")
_ip = value;
else if (key == "port")
_port = std::stoi(value);
else if (key == "username")
_username = value;
else if (key == "password")
_password = value;
else if (key == "dbname")
_dbname = value;
else if (key == "initSize")
_initSize = std::stoi(value);
else if (key == "maxSize")
_maxSize = std::stoi(value);
else if (key == "maxIdleTime")
_maxIdleTime = std::stoi(value);
else if (key == "connectionTimeOut")
_connectionTimeout = std::stoi(value);
}
}
return true;
}
3.3 连接池的构造函数
DBConnectionPool.cpp
中实现ConnectionPool
构造函数
// 连接池的构造
ConnectionPool::ConnectionPool()
{
// 加载配置项
if (!loadConfigFile())
return;
// 创建初始数量的连接
for (int i = 0; i < _initSize; ++i)
{
Connection* p = new Connection();
p->connect(_ip, _port, _username, _password, _dbname);
_connectionQue.push(p);
_connectionCnt++;
}
// 启动一个新的线程,作为连接的生产者
thread producer(bind(&ConnectionPool::produceConnectionTask, this));
producer.detach();
}
3.4 实现生产者
DBConnectionPool.cpp
中实现produceConnectionTask
// 生产者:运行在独立的线程中,专门负责生产新连接
void ConnectionPool::produceConnectionTask()
{
while (true)
{
unique_lock<mutex> lck(_queueMutex);
while (!_connectionQue.empty())
cv.wait(lck); // 若队列不空,此处生产线程进入等待状态
// 连接数量没有到达上限,继续创建新的连接
if (_connectionCnt < _maxSize)
{
// 同构造函数中的逻辑
Connection* p = new Connection();
p->connect(_ip, _port, _username, _password, _dbname);
_connectionQue.push(p);
_connectionCnt++;
}
// 通知消费者线程,可以消费连接了
cv.notify_all();
}
}
3.5 实现消费者
DBConnectionPool.cpp
中实现getConnection
// 消费者:从连接池中获取一个可用的空闲连接
shared_ptr<Connection> ConnectionPool::getConnection()
{
unique_lock<mutex> lck(_queueMutex);
while (_connectionQue.empty())
{
if (cv_status::timeout ==
cv.wait_for(lck, chrono::milliseconds(_connectionTimeout)))
{
LOG("获取空闲连接超时了...获取连接失败!");
return nullptr;
}
}
/*
shared_ptr智能指针析构时,会把connection资源直接delete掉,相当于
调用connection的析构函数,connection就被close掉了。
所以这里需要自定义shared_ptr的释放资源的方式,把connection直接归还到queue当中
*/
shared_ptr<Connection> sp(_connectionQue.front(), [&](Connection* pcon)
{
// 这里是在服务器应用线程中调用的,所以一定要考虑队列的线程安全操作
unique_lock<mutex> lock(_queueMutex);
_connectionQue.push(pcon);
});
_connectionQue.pop();
cv.notify_all(); // 消费完连接以后,通知生产者线程检查一下,看一下如果队列依旧为空,则继续生产连接
return sp;
}
3.6 实现回收超过最大空闲时间的连接
在Connection.h
的Connection
类中增加一个起始时间变量和成员方法
class Connection
{
...
public:
// 刷新一下连接的起始的空闲时间点
void refreshStartTime() { _startTime = clock(); }
// 返回存活的时间
clock_t getAliveTime() const { return clock() - _startTime; }
private:
clock_t _startTime; // 记录进入队列的起始时间
...
};
要在进入队列的时候记录连接的起始时间,ConnectionPool
的构造函数和生产者的代码部分有连接进入队列的操作,分别把这两处的代码进行如下的修改:
Connection* p = new Connection();
p->connect(_ip, _port, _username, _password, _dbname);
p->refreshStartTime(); // 记录连接起始时间
_connectionQue.push(p);
_connectionCnt++;
消费者的代码中也有归还连接的操作,也需要加一下:
shared_ptr<Connection> sp(_connectionQue.front(), [&](Connection* pcon)
{
// 这里是在服务器应用线程中调用的,所以一定要考虑队列的线程安全操作
unique_lock<mutex> lock(_queueMutex);
pcon->refreshStartTime(); // 记录连接起始时间
_connectionQue.push(pcon);
});
在ConnectionPool
构造函数中要启动一个定时线程,扫描超过maxIdleTime
的空闲连接,进行多余的连接回收
ConnectionPool::ConnectionPool()
{
...
// 启动一个新的定时线程,扫描超过maxIdleTime的空闲连接,进行多余的连接回收
thread scanner(bind(&ConnectionPool::scannerConnectionTask, this));
scanner.detach();
}
DBConnectionPool.cpp
中实现scannerConnectionTask
// 扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收
void ConnectionPool::scannerConnectionTask()
{
while (true)
{
// 通过sleep模拟定时效果(秒)
this_thread::sleep_for(chrono::seconds(_maxIdleTime));
// 扫描整个队列,释放多余的连接
unique_lock<mutex> lock(_queueMutex);
while (_connectionCnt > _initSize)
{
Connection* p = _connectionQue.front(); // 取出队头(队尾入队,队头是时间最长的那一个)
if (p->getAliveTime() >= (_maxIdleTime * CLOCKS_PER_SEC)) //ctime中clock计算的是时钟周期(clock ticks)
{
_connectionQue.pop();
_connectionCnt--;
delete p; // 调用~Connection()释放连接
}
else
break; // 队头的连接没有超过_maxIdleTime,其它的连接肯定没有
}
}
}
四、连接池压力测试
单线程未使用连接池测试:
// 1000次
clock_t begin = clock();
for (int i = 0; i < 1000; ++i)
{
Connection conn;
string sql = "insert into user(name,age,sex) values('zhang san', 20, 'male');";
conn.connect("127.0.0.1", 3306, "root", "123456", "chat");
conn.update(sql); //更新sql语句
}
clock_t end = clock();
cout << (end - begin) << "ms" << endl;
单线程使用连接池测试:
// 1000次
clock_t begin = clock();
ConnectionPool* cp = ConnectionPool::getConnectionPool();
for (int i = 0; i < 1000; ++i)
{
shared_ptr<Connection> sp = cp->getConnection();
string sql = "insert into user(name,age,sex) values('zhang san', 20, 'male');";
sp->update(sql);
}
clock_t end = clock();
cout << (end - begin) << "ms" << endl;
多线程未使用连接池测试:
// 下面各线程都是同一个用户名密码,大量并发会出问题,要先连接一下就行了
Connection conn;
conn.connect("127.0.0.1", 3306, "root", "123456", "chat");
clock_t begin = clock();
thread t1([]() {
for (int i = 0; i < 1250; ++i)
{
Connection conn;
string sql = "insert into user(name,age,sex) values('zhang san', 20, 'male');";
conn.connect("127.0.0.1", 3306, "root", "123456", "chat");
conn.update(sql);
}
});
thread t2([]() {
for (int i = 0; i < 1250; ++i)
{
Connection conn;
string sql = "insert into user(name,age,sex) values('zhang san', 20, 'male');";
conn.connect("127.0.0.1", 3306, "root", "123456", "chat");
conn.update(sql);
}
});
thread t3([]() {
for (int i = 0; i < 1250; ++i)
{
Connection conn;
string sql = "insert into user(name,age,sex) values('zhang san', 20, 'male');";
conn.connect("127.0.0.1", 3306, "root", "123456", "chat");
conn.update(sql);
}
});
thread t4([]() {
for (int i = 0; i < 1250; ++i)
{
Connection conn;
string sql = "insert into user(name,age,sex) values('zhang san', 20, 'male');";
conn.connect("127.0.0.1", 3306, "root", "123456", "chat");
conn.update(sql);
}
});
t1.join();
t2.join();
t3.join();
t4.join();
clock_t end = clock();
cout << (end - begin) << "ms" << endl;
多线程使用连接池测试
clock_t begin = clock();
ConnectionPool* cp = ConnectionPool::getConnectionPool();
thread t1([=]() {
for (int i = 0; i < 2500; ++i)
{
shared_ptr<Connection> sp = cp->getConnection();
string sql = "insert into user(name,age,sex) values('zhang san', 20, 'male');";
sp->update(sql);
}
});
thread t2([=]() {
for (int i = 0; i < 2500; ++i)
{
shared_ptr<Connection> sp = cp->getConnection();
string sql = "insert into user(name,age,sex) values('zhang san', 20, 'male');";
sp->update(sql);
}
});
thread t3([=]() {
for (int i = 0; i < 2500; ++i)
{
shared_ptr<Connection> sp = cp->getConnection();
string sql = "insert into user(name,age,sex) values('zhang san', 20, 'male');";
sp->update(sql);
}
});
thread t4([=]() {
for (int i = 0; i < 2500; ++i)
{
shared_ptr<Connection> sp = cp->getConnection();
string sql = "insert into user(name,age,sex) values('zhang san', 20, 'male');";
sp->update(sql);
}
});
t1.join();
t2.join();
t3.join();
t4.join();
clock_t end = clock();
cout << (end - begin) << "ms" << endl;
压力测试结果:
数据总量 | 未使用连接池花费时间 | 使用连接池花费时间 | ||
---|---|---|---|---|
单线程 | 四线程 | 单线程 | 四线程 | |
1000 | 9916ms | 4793ms | 1895ms | 792ms |
5000 | 43621ms | 19913ms | 7842ms | 3510ms |
10000 | 87796ms | 37590ms | 14522ms | 7607ms |
可以看到,使用连接池的提升是非常明显的!
五、完整源码下载
MySQL数据库连接池项目代码