目录
- 一、数据库连接池
- 1.1 池化技术
- 1.2 数据库连接池及其作用
- 1.3 不使用数据库连接池
- 1.4 使用数据库连接池
- 1.5 长连接和连接池
- 1.6 数据库连接池运行机制
- 1.7 连接池和线程池的关系
- 二、数据库连接池的设计
- 2.1 mysql 连接池
- 1. 构造函数
- 2. 初始化
- 3. 请求获取连接
- 4. 归还连接
- 5. 析构连接池
- 三、连接池连接数量设置
- 四、测试
- 4.1 使用连接池
- 4.2 不使用连接池
- 4.3 测试结果
一、数据库连接池
1.1 池化技术
在手写线程池一文中,我们简单介绍了什么是线程池、为什么需要线程池、以及如何实现线程池。本文介绍的数据库连接池,与线程池一样,也是利用池化技术来避免频繁创建资源对象,从而减小系统开销。
所谓的池化技术,是一种资源管理技术,旨在通过事先分配和复用资源,提高系统的效率和性能。简单来说,就是通过提前建立一些资源对象,使用时获取,用完之后归还。这样能够减少资源对象的创建次数,提高程序的响应性能,特别是在高并发场景。使用池化技术缓存的资源对象有如下共同特点:
1)对象创建时间长
2)对象创建需要大量资源
3)对象创建后可被重复使用
像常见的线程池、内存池、连接池、对象池都具有以上的共同特点。
1.2 数据库连接池及其作用
创建数据库连接是一个很耗时的操作(后面会具体介绍),因此利用池化技术,在应用程序和数据库服务器之间建立一个缓存区,即数据库连接池。在连接池中缓存预先建立好的数据库连接对象。当应用程序需要访问数据库时,可以从连接池中获取一个可用的数据库连接对象进行操作,并在完成操作后将其归还到连接池中。
作用
1)资源复用
避免了频繁的创建、释放连接引起的性能开销,减少系统消耗。另一方面也增进了系统运行环境的平稳性(减少内存碎片以及数据库临时进程/线程的数量)。
2)更快的系统响应速度
数据库连接池在初始化工作完成后,对于业务请求处理而言,直接利用现有可用的连接,避免了从数据库连接初始化和释放过程的开销,从而缩减了系统整体响应时间。
3)统一的连接管理,避免数据库连接泄露
根据预先设定的连接占用超时,强制收回被占用连接,从而避免了常规数据库连接操作中可能出现的资源泄露。
1.3 不使用数据库连接池
前面介绍了数据库连接池的作用,为了更直观点体现优势所在。我们看看如果不使用数据库连接池时,数据库怎么查询。
对于每条 sql 语句,都要执行下面流程
1)TCP建立连接的三次握手(客户端与MySQL服务器的连接基于TCP协议)
2)MySQL认证的三次握手
3)真正的SQL执行
4)MySQL的关闭
5)TCP的四次握手关闭
可以看到,为了执行一条SQL,需要进行TCP三次握手,Mysql认证、Mysql关闭、TCP四次挥手等其他操作,执行SQL操作在所有的操作占比非常低。
这种方式实现简单,不需要设计连接池。但是缺点比较多
1)网络IO较多。
2)带宽利用率低。
3)QPS较低。
4)频繁创建连接和关闭连接,导致临时对象较多,产生更多的内存碎片。
5)关闭连接后出现大量TIME_WAIT的TCP状态。
1.4 使用数据库连接池
如果使用数据库连接池,在第一次访问的时候,需要建立连接。 但是之后的访问,均会复用之前创建的连接,直接执行SQL语句。
这种连接池的设计较为复杂,但是优点多多:
1)降低网络开销。
2)连接复用。减少连接次数。
3)提升性能。避免了频繁的创建连接。
4)没有TIME_WAIT状态问题。
1.5 长连接和连接池
长连接和连接池都是为了优化数据库连接性能而设计的技术.
1)长连接是指客户端与服务器建立一次连接后,可以重复使用该连接来传输多个请求和响应消息。长连接主要解决单个客户端与服务器之间建立、关闭TCP/IP链接所带来的性能问题。
2)数据库连接池则是在应用程序和数据库服务器之间建立一个缓存区,缓存预先建立好的数据库连接。数据库连接池则更关注于解决应用程序与数据库之间频繁创建、关闭网络链接所带来的性能问题。
连接池内的连接,其实就是长连接,连接池的本质是管理长连接。
1.6 数据库连接池运行机制
1)从连接池获取或创建可用连接;
2)使用完毕,把连接返回给连接池。
3)系统关闭前,断开所有连接并释放连接占用的系统资源。
1.7 连接池和线程池的关系
1)线程池:主动调用任务。当任务队列不为空的时候从队列取任务取执行。
2)连接池:被动被任务使用。当某任务需要操作数据库时,只要从连接池中取出一个连接对象,当任务使用完该连接对象后,将该连接对象放回到连接池中。如果连接池中没有连接对象可以用,那么该任务就必须等待。
连接池和线程池设置数量的关系
1)一般线程池线程数量和连接池连接对象数量一致;
2)线程执行任务完毕的时候归还连接对象;
连接池和线程池在实际应用中往往会一起使用来达到更好的性能优化效果。
二、数据库连接池的设计
连接池的设计思路:
1)连接到数据库,涉及到数据库ip、端口、用户名、密码、数据库名字等;
\quad
~ 连接的操作,每个连接对象都是独立的连接通道,它们是独立的
\quad
~ 配置最小连接数和最大连接数
2)需要一个队列管理他的连接,比如使用list;
3)获取连接对象:
4)归还连接对象;
5)连接池的名字:不同的业务可以设计不同的连接池,根据名字进行区分。比如对于通讯聊天应用,对于存储聊天记录的msgpool,对于存储用户信息的userpool。
2.1 mysql 连接池
1. 构造函数
CDBPool::CDBPool(const char *pool_name, const char *db_server_ip, uint16_t db_server_port,
const char *username, const char *password, const char *db_name, int max_conn_cnt)
{
m_pool_name = pool_name; // 连接池的名字
m_db_server_ip = db_server_ip; // mysql的ip地址
m_db_server_port = db_server_port; // mysql的端口号
m_username = username; // mysql的用户名
m_password = password; // mysql的秘密
m_db_name = db_name; // mysql的数据库名称
m_db_max_conn_cnt = max_conn_cnt; // 最大连接数量
m_db_cur_conn_cnt = MIN_DB_CONN_CNT; // 最小连接数量
}
2. 初始化
// 连接对象初始化
int CDBConn::Init()
{
m_mysql = mysql_init(NULL); // mysql_标准的mysql c client对应的api
if (!m_mysql)
{
log_error("mysql_init failed\n");
return 1;
}
int reconnect = 1;
mysql_options(m_mysql, MYSQL_OPT_RECONNECT, &reconnect); // 配合mysql_ping实现自动重连
mysql_options(m_mysql, MYSQL_SET_CHARSET_NAME, "utf8mb4"); // utf8mb4和utf8区别
// ip 端口 用户名 密码 数据库名
if (!mysql_real_connect(m_mysql, m_pDBPool->GetDBServerIP(), m_pDBPool->GetUsername(), m_pDBPool->GetPasswrod(),
m_pDBPool->GetDBName(), m_pDBPool->GetDBServerPort(), NULL, 0))
{
log_error("mysql_real_connect failed: %s\n", mysql_error(m_mysql));
return 2;
}
return 0;
}
// 连接池的初始化
int CDBPool::Init()
{
// 创建固定最小的连接数量
for (int i = 0; i < m_db_cur_conn_cnt; i++)
{
// 新建连接
CDBConn *pDBConn = new CDBConn(this);
// 初始化连接
int ret = pDBConn->Init();
if (ret)
{
delete pDBConn;
return ret;
}
// 新建立的连接加入容器
m_free_list.push_back(pDBConn);
}
return 0;
}
3. 请求获取连接
/*
*TODO: 增加保护机制,把分配的连接加入另一个队列,这样获取连接时,如果没有空闲连接,
*TODO: 检查已经分配的连接多久没有返回,如果超过一定时间,则自动收回连接,放在用户忘了调用释放连接的接口
* timeout_ms默认为 0死等
* timeout_ms >0 则为等待的时间
*/
CDBConn *CDBPool::GetDBConn(const int timeout_ms)
{
// 加锁
std::unique_lock<std::mutex> lock(m_mutex);
// 是否关闭连接池,关闭则不能获取连接
if(m_abort_request)
{
log_warn("have aboort\n");
return NULL;
}
// 检查连接池池是否有空闲连接
// 1. 连接池没有空闲连接可以用
if (m_free_list.empty())
{
// 检查当前连接数量是否达到最大的连接数量
// 1.1 当前连接数 >= 最大连接数,不能创建新的连接
if (m_db_cur_conn_cnt >= m_db_max_conn_cnt)
{
// 等待的逻辑,看看是否需要超时等待
// 1.1.1 死等,直到有连接可以用 或者 连接池要退出
if(timeout_ms <= 0)
{
log_info("wait ms:%d\n", timeout_ms);
m_cond_var.wait(lock, [this]
{
log_info("wait:%d, size:%lu\n", wait_cout++, m_free_list.size());
// 当 (m_free_list不为空,等到了空闲连接 )或者 (请求释放连接池),这两种情况时退出
return (!m_free_list.empty()) | m_abort_request;
});
}
// 1.1.2 超时等待
else {
// return如果返回 false,继续wait(或者超时), 如果返回true退出wait
// 1.m_free_list不为空
// 2.超时退出
// 3. m_abort_request被置为true,要释放整个连接池
m_cond_var.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this] {
log_info("wait_for:%d, size:%lu\n", wait_cout++, m_free_list.size());
return (!m_free_list.empty()) | m_abort_request;
});
// 如果超时退出,且连接池还是没有空闲则退出
if(m_free_list.empty())
{
return NULL;
}
}
// 若连接池要释放,则不能获取资源
if(m_abort_request)
{
log_warn("have aboort\n");
return NULL;
}
}
// 1.2 当前连接数 < 最大连接数,创建连接
else
{
CDBConn *pDBConn = new CDBConn(this); //新建连接
int ret = pDBConn->Init();
if (ret)
{
log_error("Init DBConnecton failed\n\n");
delete pDBConn;
return NULL;
}
else
{
m_free_list.push_back(pDBConn);
m_db_cur_conn_cnt++;
}
}
}
// 2. 连接池有空闲连接可以用
// 获取连接
CDBConn *pConn = m_free_list.front();
// 从空闲队列删除
m_free_list.pop_front();
// 加入到已经被请求的连接
m_used_list.push_back(pConn);
return pConn;
}
4. 归还连接
void CDBPool::RelDBConn(CDBConn *pConn)
{
if(!pConn) {
log_error("pConn is null");
return;
}
std::lock_guard<std::mutex> lock(m_mutex);
list<CDBConn *>::iterator it = m_free_list.begin();
// 检测该连接是否已经归还,避免重复归还
for (; it != m_free_list.end(); it++){
// 说明这个连接已经归还过
if (*it == pConn)
{
break;
}
}
// 还没有归还,插入空闲队列并通知取队列
if (it == m_free_list.end())
{
m_used_list.remove(pConn);
m_free_list.push_back(pConn);
m_cond_var.notify_one(); // 通知取队列
}
// 已经归还过了
else
{
log_error("RelDBConn failed\n"); // 不再次回收连接
}
}
5. 析构连接池
CDBPool::~CDBPool()
{
std::lock_guard<std::mutex> lock(m_mutex);
// 设置关闭标记
m_abort_request = true;
// 通知所有在等待的请求退出
m_cond_var.notify_all();
for (list<CDBConn *>::iterator it = m_free_list.begin(); it != m_free_list.end(); it++)
{
CDBConn *pConn = *it;
delete pConn;
}
m_free_list.clear();
}
三、连接池连接数量设置
1)经验公式: 连接数 = [ ( c p u 核心数 × 2 ) + 有效磁盘数 ] 连接数 = \left[(cpu核心数 \times2) + 有效磁盘数\right] 连接数=[(cpu核心数×2)+有效磁盘数]
# 查看CPU信息(型号)
cat /proc/cpuinfo | grep name | cut -f2 -d: | uniq -c
# 查看物理CPU个数
cat /proc/cpuinfo| grep "physical id"| sort| uniq| wc -l
# 查看每个物理cpu核数
cat /proc/cpuinfo| grep "cpu cores"| uniq
# 查看逻辑cpu的个数
cat /proc/cpuinfo| grep "processor"| wc -l
2)对于IO密集型的任务
连接数
=
线程数
=
1
C
P
U
利用率
连接数=线程数=\dfrac{1}{CPU利用率}
连接数=线程数=CPU利用率1
如果任务整体上是一个IO密集型的任务。在处理一个请求的过程中(处理一个任务),总共耗时
100+5=105ms,而其中只有5ms是用于计算操作的(消耗cpu),另外的100ms等待io响应,则CPU利用率为
5
100
+
5
=
4.76
%
\frac{5}{100+5}=4.76\%
100+55=4.76%。
四、测试
4.1 使用连接池
void *workUsePool(void *arg, int id) // 任务
{
// printf("workUsePool id:%d\n", id);
CDBPool *pDBPool = (CDBPool *)arg;
CDBConn *pDBConn = pDBPool->GetDBConn(2000); // 获取连接,超时2000ms
if (pDBConn)
{
bool ret = insertUser(pDBConn, id); // 插入用户信息
if (!ret)
{
printf("insertUser failed\n");
}
pDBPool->RelDBConn(pDBConn); // 然后释放连接
}
else
{
printf("GetDBConn failed\n");
}
// printf("exit id:%d\n", id);
return NULL;
}
// 使用连接池的测试
int testWorkUsePool(int thread_num, int db_maxconncnt, int task_num)
{
const char *db_pool_name = DB_POOL_NAME;
const char *db_host = DB_HOST_IP;
int db_port = DB_HOST_PORT;
const char *db_dbname = DB_DATABASE_NAME;
const char *db_username = DB_USERNAME;
const char *db_password = DB_PASSWORD;
// 每个连接池都对应一个对象
CDBPool *pDBPool = new CDBPool(db_pool_name, db_host, db_port,
db_username, db_password, db_dbname, db_maxconncnt);
if (pDBPool->Init())
{
printf("init db instance failed: %s", db_pool_name);
return -1;
}
CDBConn *pDBConn = pDBPool->GetDBConn(); // 获取连接
if (pDBConn)
{
bool ret = pDBConn->ExecuteDrop(DROP_IMUSER_TABLE); // 删除表
if (ret)
{
// printf("DROP_IMUSER_TABLE ok\n");
}
// 1. 创建表
ret = pDBConn->ExecuteCreate(CREATE_IMUSER_TABLE); // 创建表
if (ret)
{
// printf("CREATE_IMUSER_TABLE ok\n");
}
else
{
printf("ExecuteCreate failed\n");
return -1;
}
}
pDBPool->RelDBConn(pDBConn); // 一定要归还
printf("task_num = %d, thread_num = %d, connection_num:%d, use_pool:1\n",
task_num, thread_num, db_maxconncnt);
ZERO_ThreadPool threadpool;
threadpool.init(thread_num); // 设置线程数量
threadpool.start(); // 启动线程池
uint64_t start_time = get_tick_count();
for (int i = 0; i < task_num; i++)
{
threadpool.exec(workUsePool, (void *)pDBPool, i); //把所有的任务都给线程池
}
cout << "need time0: " << get_tick_count() - start_time << "ms\n";
threadpool.waitForAllDone(); // 等待所有执行万再退出
cout << "need time1: " << get_tick_count() - start_time << "ms\n";
threadpool.stop();
cout << "need time2: " << get_tick_count() - start_time << "ms\n\n";
delete pDBPool;
return 0;
}
4.2 不使用连接池
// 没有用连接池,每次任务的执行都重新初始化连接
void *workNoPool(void *arg, int id)
{
// printf("workNoPool\n");
arg = arg; // 避免警告
const char *db_pool_name = DB_POOL_NAME;
const char *db_host = DB_HOST_IP;
int db_port = DB_HOST_PORT;
const char *db_dbname = DB_DATABASE_NAME;
const char *db_username = DB_USERNAME;
const char *db_password = DB_PASSWORD;
int db_maxconncnt = 1; // 这里就只初始化一个连接
CDBPool *pDBPool = new CDBPool(db_pool_name, db_host, db_port,
db_username, db_password, db_dbname, db_maxconncnt);
if (!pDBPool)
{
printf("workNoPool new CDBPool failed\n");
return NULL;
}
if (pDBPool->Init())
{
printf("init db instance failed: %s\n", db_pool_name);
return NULL;
}
CDBConn *pDBConn = pDBPool->GetDBConn();
if (pDBConn)
{
bool ret = insertUser(pDBConn, id);
if (!ret)
{
printf("insertUser failed\n");
}
}
else
{
printf("GetDBConn failed\n");
}
pDBPool->RelDBConn(pDBConn);
delete pDBPool; // 销毁连接池,实际是销毁连接
return NULL;
}
// 初始化和使用连接池是一样的
int testWorkNoPool(int thread_num, int db_maxconncnt, int task_num)
{
const char *db_pool_name = DB_POOL_NAME;
const char *db_host = DB_HOST_IP;
int db_port = DB_HOST_PORT;
const char *db_dbname = DB_DATABASE_NAME;
const char *db_username = DB_USERNAME;
const char *db_password = DB_PASSWORD;
// 每个连接池都对应一个对象
CDBPool *pDBPool = new CDBPool(db_pool_name, db_host, db_port,
db_username, db_password, db_dbname, db_maxconncnt);
if (pDBPool->Init())
{
printf("init db instance failed: %s", db_pool_name);
return -1;
}
CDBConn *pDBConn = pDBPool->GetDBConn(); // 获取连接
if (pDBConn)
{
bool ret = pDBConn->ExecuteDrop(DROP_IMUSER_TABLE); // 删除表
if (ret)
{
printf("DROP_IMUSER_TABLE ok\n");
}
// 1. 创建表
ret = pDBConn->ExecuteCreate(CREATE_IMUSER_TABLE); // 创建表
if (ret)
{
printf("CREATE_IMUSER_TABLE ok\n");
}
else
{
printf("ExecuteCreate failed\n");
return -1;
}
}
pDBPool->RelDBConn(pDBConn);
printf("task_num = %d, thread_num = %d, connection_num:%d, use_pool:0\n",
task_num, thread_num, db_maxconncnt);
ZERO_ThreadPool threadpool;
threadpool.init(thread_num); // 设置线程数量
threadpool.start(); // 启动线程池
uint64_t start_time = get_tick_count();
for (int i = 0; i < task_num; i++)
{
threadpool.exec(workNoPool, (void *)pDBPool, i); // 主要在于执行函数的区别。
}
cout << "need time0: " << get_tick_count() - start_time << "ms\n";
threadpool.waitForAllDone(); // 等待所有执行万再退出
cout << "need time1: " << get_tick_count() - start_time << "ms\n";
threadpool.stop();
cout << "need time2: " << get_tick_count() - start_time << "ms\n\n";
return 0;
}
4.3 测试结果
从测试结果可以看出
1)使用连接池的效果明显由于不使用连接池。
2)使用连接池时,线程数量不宜过大,超出一定限度,性能反而会下降。