文章目录
- 前言
- 一、PooledDataSourceFactory
- 二、获取连接
- 三、归还连接
前言
其实大部分连接池的代码都大同小异,总体获取连接,归还连接逻辑大都相同。希望通过阅读本文章,能给你带来帮助。
测试用例
public void testMybatis()throws Exception{
SqlSessionFactoryBuilder sqlSessionFactoryBuilder=new SqlSessionFactoryBuilder();
org.springframework.core.io.ClassPathResource classPathResource=new ClassPathResource("org/apache/ibatis/user/mybatis.xml");
InputStream inputStream = classPathResource.getInputStream();
// 1.读取配置文件获取SqlSessionFactory
SqlSessionFactory sqlSessionFactory= sqlSessionFactoryBuilder.build(inputStream);
// 2.获取sqlsession
SqlSession sqlSession = sqlSessionFactory.openSession();
// 3.执行获取结果
User user = new User(1L, null, null, null);
UserMapper mapper = sqlSession.getMapper(UserMapper.class);
List<User> users = mapper.selectUser(user);
mapper.selectUser(user);
System.out.println(users);
}
在执行下列语句时,mybatis会去读取配置文件,并创建我们的Configuration,并会给Configuration设置相对的属性值。
SqlSessionFactory sqlSessionFactory= sqlSessionFactoryBuilder.build(inputStream);
在Configuration初始化时,就会设置typeAliasRegistry.registerAlias(“POOLED”, PooledDataSourceFactory.class);所以本节主要也就是根据PooledDataSourceFactory去讲解
public Configuration() {
// 事务工厂类型别名
typeAliasRegistry.registerAlias("JDBC", JdbcTransactionFactory.class);
typeAliasRegistry.registerAlias("MANAGED", ManagedTransactionFactory.class);
// 数据源工厂类型别名
typeAliasRegistry.registerAlias("JNDI", JndiDataSourceFactory.class);
typeAliasRegistry.registerAlias("POOLED", PooledDataSourceFactory.class);
typeAliasRegistry.registerAlias("UNPOOLED", UnpooledDataSourceFactory.class);
// 缓存类型别名
typeAliasRegistry.registerAlias("PERPETUAL", PerpetualCache.class);
typeAliasRegistry.registerAlias("FIFO", FifoCache.class);
typeAliasRegistry.registerAlias("LRU", LruCache.class);
typeAliasRegistry.registerAlias("SOFT", SoftCache.class);
typeAliasRegistry.registerAlias("WEAK", WeakCache.class);
// 数据库厂商标识类型别名
typeAliasRegistry.registerAlias("DB_VENDOR", VendorDatabaseIdProvider.class);
// 语言驱动类型别名
typeAliasRegistry.registerAlias("XML", XMLLanguageDriver.class);
typeAliasRegistry.registerAlias("RAW", RawLanguageDriver.class);
// 日志实现类型别名
typeAliasRegistry.registerAlias("SLF4J", Slf4jImpl.class);
typeAliasRegistry.registerAlias("COMMONS_LOGGING", JakartaCommonsLoggingImpl.class);
typeAliasRegistry.registerAlias("LOG4J", Log4jImpl.class);
typeAliasRegistry.registerAlias("LOG4J2", Log4j2Impl.class);
typeAliasRegistry.registerAlias("JDK_LOGGING", Jdk14LoggingImpl.class);
typeAliasRegistry.registerAlias("STDOUT_LOGGING", StdOutImpl.class);
typeAliasRegistry.registerAlias("NO_LOGGING", NoLoggingImpl.class);
// 代理工厂类型别名
typeAliasRegistry.registerAlias("CGLIB", CglibProxyFactory.class);
typeAliasRegistry.registerAlias("JAVASSIST", JavassistProxyFactory.class);
// 默认语言驱动
languageRegistry.setDefaultDriverClass(XMLLanguageDriver.class);
// 注册语言驱动
languageRegistry.register(RawLanguageDriver.class);
}
一、PooledDataSourceFactory
PooledDataSourceFactory
public class PooledDataSourceFactory extends UnpooledDataSourceFactory {
public PooledDataSourceFactory() {
this.dataSource = new PooledDataSource();
}
}
PooledDataSource
public class PooledDataSource implements DataSource {
private static final Log log = LogFactory.getLog(PooledDataSource.class);
private final PoolState state = new PoolState(this);
private final UnpooledDataSource dataSource;
// 连接池参数配置,最大连接数,
protected int poolMaximumActiveConnections = 10;
// 最大空闲连接数
protected int poolMaximumIdleConnections = 5;
// 最大检出时间
protected int poolMaximumCheckoutTime = 20000;
// 最大等待时间
protected int poolTimeToWait = 20000;
// 最大坏的连接数
protected int poolMaximumLocalBadConnectionTolerance = 3;
// 服务器是否ping的通MySQL数据库
protected String poolPingQuery = "NO PING QUERY SET";
protected boolean poolPingEnabled;
protected int poolPingConnectionsNotUsedFor;
private int expectedConnectionTypeCode;
}
PoolState 当前运行的数据源状态
public class PoolState {
protected PooledDataSource dataSource;
// 空闲连接
protected final List<PooledConnection> idleConnections = new ArrayList<>();
// 活跃连接
protected final List<PooledConnection> activeConnections = new ArrayList<>();
// 请求次数
protected long requestCount;
// 累计请求时间
protected long accumulatedRequestTime;
// 请求使用时间
protected long accumulatedCheckoutTime;
// 过期未还连接数量
protected long claimedOverdueConnectionCount;
// 过期未还连接数量连接时间
protected long accumulatedCheckoutTimeOfOverdueConnections;
// 累计等待时长
protected long accumulatedWaitTime;
// 获取连接等待数量
protected long hadToWaitCount;
// 坏的连接数量
protected long badConnectionCount;
}
以下代码可以配置我们的数据库连接池
<dataSource type="org.apache.ibatis.datasource.pooled.PooledDataSourceFactory">
<property name="driver" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
<property name="poolMaximumActiveConnections" value="20"/>
<property name="poolMaximumIdleConnections" value="5"/>
<property name="poolMaximumCheckoutTime" value="20000"/>
<property name="poolTimeToWait" value="20000"/>
<property name="poolPingEnabled" value="true"/>
<property name="poolPingQuery" value="SELECT 1"/>
<property name="poolPingConnectionsNotUsedFor" value="3600000"/>
</dataSource>
二、获取连接
在执行sqlSessionFactory.openSession();会去数据库连接池获取我们的连接
SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream);
SqlSession sqlSession = sqlSessionFactory.openSession();
PooledDataSource.getConnection()获取连接(这里就直接看获取连接的代码)
public Connection getConnection() throws SQLException {
// 这里返回的是被代理过后的Connection
return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
}
PooledDataSource.popConnection()
private PooledConnection popConnection(String username, String password) throws SQLException {
boolean countedWait = false;
PooledConnection conn = null;
// 计算获取时间
long t = System.currentTimeMillis();
// 坏连接数量
int localBadConnectionCount = 0;
while (conn == null) {
// 进行上锁 ,
lock.lock();
try {
// 1.如果空闲的连接不为空
if (!state.idleConnections.isEmpty()) {
// 2. 从空闲连接拿一个,并且从空闲连接集合中删除
conn = state.idleConnections.remove(0);
if (log.isDebugEnabled()) {
log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
}
// 2.如果空闲的连接为空,且当前活跃连接数量 < 配置的最大连接数 (池子还没满状态下)
} else if (state.activeConnections.size() < poolMaximumActiveConnections) {
// 3.就直接创建一个新的连接
conn = new PooledConnection(dataSource.getConnection(), this);
if (log.isDebugEnabled()) {
log.debug("Created connection " + conn.getRealHashCode() + ".");
}
// 2.如果空闲的连接为空,且当前活跃连接数量 >= 配置的最大连接数 (池子已经满了,装不下了)
} else {
// 3. 在当前活跃连接池子中,拿到最开始进入活跃连接池的连接时间
PooledConnection oldestActiveConnection = state.activeConnections.get(0);
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
// 4.判断最开始在活跃连接是否 > 设置的超时连接。
if (longestCheckoutTime > poolMaximumCheckoutTime) {
// 5.进行一些超时计算,把超时连接从池子remove出去
state.claimedOverdueConnectionCount++;
state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
state.accumulatedCheckoutTime += longestCheckoutTime;
state.activeConnections.remove(oldestActiveConnection);
if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
try {
// 6. 进行回滚
oldestActiveConnection.getRealConnection().rollback();
} catch (SQLException e) {
log.debug("Bad connection. Could not roll back");
}
}
// 7. 把旧的连接进行一个包装,生成一个新的连接PooledConnection conn
conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
// 8. 把旧的连接设置为不可用
oldestActiveConnection.invalidate();
if (log.isDebugEnabled()) {
log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
}
} else {
// 判断最开始在活跃连接不大于设置的超时连接。那当前线程获取连接就需要进行等待
try {
if (!countedWait) {
state.hadToWaitCount++;
countedWait = true;
}
if (log.isDebugEnabled()) {
log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
}
long wt = System.currentTimeMillis();
condition.await(poolTimeToWait, TimeUnit.MILLISECONDS);
state.accumulatedWaitTime += System.currentTimeMillis() - wt;
} catch (InterruptedException e) {
// set interrupt flag
Thread.currentThread().interrupt();
break;
}
}
}
if (conn != null) {
//ping一下当前连接是否可用,
if (conn.isValid()) {
// 判断当前连接是手动提交,还是自动提交。
if (!conn.getRealConnection().getAutoCommit()) {
// 如果是手动提交,则直接进行回滚
conn.getRealConnection().rollback();
}
// 可用连接属性值设置,并加入活跃连接池中,记录请求次数,使用开始时间等等。
conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
conn.setCheckoutTimestamp(System.currentTimeMillis());
conn.setLastUsedTimestamp(System.currentTimeMillis());
state.activeConnections.add(conn);
state.requestCount++;
state.accumulatedRequestTime += System.currentTimeMillis() - t;
} else {
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode()
+ ") was returned from the pool, getting another connection.");
}
state.badConnectionCount++;
localBadConnectionCount++;
conn = null;
if (localBadConnectionCount > poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Could not get a good connection to the database.");
}
throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
}
}
}
} finally {
lock.unlock();
}
}
return conn;
}
情况一:存在空闲连接
- 对当前线程进行上锁
- 如果有空闲连接,则直接从空闲连接当中获取第一个连接,并移除。
- ping一下当前连接是否可用,判断当前连接是手动提交,还是自动提交。(手动提交就回滚,保证当前连接是干净的连接)
- 可用连接属性值设置,并加入活跃连接池中,记录请求次数,使用开始时间等等。
- 释放锁
情况二:不存在空闲连接,且活跃连接池数量 < 配置的最大连接数(池子还没满状态下)
- 对当前线程进行上锁
- 如果没有空闲连接,且当前活跃连接数量 < 配置的最大连接数 (池子还没满状态下)
- 直接创建一个新的连接
- ping一下当前连接是否可用,判断当前连接是手动提交,还是自动提交。(手动提交就回滚,保证当前连接是干净的连接)
- 可用连接属性值设置,并加入活跃连接池中,记录请求次数,使用开始时间等等。
- 释放锁
情况三:不存在空闲连接,且当前活跃连接数量 >= 配置的最大连接数 (池子已经满了,装不下了)
- 对当前线程进行上锁
- 如果没有空闲连接,且当前活跃连接数量 >= 配置的最大连接数 (池子已经满了,装不下了)
- 判断当前连接使用时间是否超时,如果没超时就进入等待状态。
- 如果当前连接超时了,就记录超时时间,把超时连接从活跃连接池移除。对当前超时连接进行回滚操作。
- 把超时的连接包装成一个新的连接PooledConnection,并把超时的连接设置为不可用。
- ping一下当前连接是否可用,判断当前连接是手动提交,还是自动提交。(手动提交就回滚,保证当前连接是干净的连接)
- 可用连接属性值设置,并加入活跃连接池中,记录请求次数,使用开始时间等等。
- 释放锁
三、归还连接
当我们调用 sqlSession.close()时,会去归还当前连接。
User user = new User(1L, null, null, null);
UserMapper mapper = sqlSession.getMapper(UserMapper.class);
List<User> users = mapper.selectUser(user);
mapper.selectUser(user);
// 手动关闭sqlSession,归还连接
sqlSession.close();
PooledConnection 类,实现了InvocationHandler 接口,大家可以发现这是一个jdk动态代理接口。
如果忘记了动态代理,戳这里
class PooledConnection implements InvocationHandler {
private final Connection realConnection;
private final Connection proxyConnection;
// 构造方法
public PooledConnection(Connection connection, PooledDataSource dataSource) {
this.hashCode = connection.hashCode();
this.realConnection = connection;
this.dataSource = dataSource;
this.createdTimestamp = System.currentTimeMillis();
this.lastUsedTimestamp = System.currentTimeMillis();
this.valid = true;
// 这里会进行jdk动态代理,在上面getconnection,获取连接时,就会返回这个被代理过后的Connection。
this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
}
// invoke方法,当我们的Connection 调用任何方法时,会先调用我们的invoke方法。
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
// 查看当前调用方法,如果调用的是close方法,就执行归还连接的逻辑
if (CLOSE.equals(methodName)) {
// 归还连接
dataSource.pushConnection(this);
return null;
}
try {
if (!Object.class.equals(method.getDeclaringClass())) {
// issue #579 toString() should never fail
// throw an SQLException instead of a Runtime
checkConnection();
}
return method.invoke(realConnection, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
}
dataSource.pushConnection(this);
protected void pushConnection(PooledConnection conn) throws SQLException {
// 1.上锁
lock.lock();
try {
// 2. 从活跃连接池中移除
state.activeConnections.remove(conn);
// 3. 查看当前连接池是否可以,是否能ping通
if (conn.isValid()) {
// 4. 如果空闲连接数量 < 容器设置最大连接数量
if (state.idleConnections.size() < poolMaximumIdleConnections
&& conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
state.accumulatedCheckoutTime += conn.getCheckoutTime();
// 5.进行回滚操作
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
// 6.重新进行包装,把当前连接放入空闲连接当中
PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
state.idleConnections.add(newConn);
newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
// 7. 把原有连接设置为不可用
conn.invalidate();
if (log.isDebugEnabled()) {
log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
}
condition.signal();
} else {
// 如果当前连接满了,就进行回滚,把当前连接设置为不可用
state.accumulatedCheckoutTime += conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
conn.getRealConnection().close();
if (log.isDebugEnabled()) {
log.debug("Closed connection " + conn.getRealHashCode() + ".");
}
conn.invalidate();
}
} else {
// 如果是坏连接,不用管。
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode()
+ ") attempted to return to the pool, discarding connection.");
}
state.badConnectionCount++;
}
} finally {
// 释放锁
lock.unlock();
}
}
情况一:空闲连接数量 < 容器设置最大连接数量
- 对当前线程进行上锁
- 从活跃连接池中移除,并查看当前连接池是否可以,是否能ping通
- 如果空闲连接数量 < 容器设置最大连接数量
- 进行回滚操作,重新进行包装,把包装后的连接放入空闲连接池当中
- 把原有连接设置为不可用
- 释放锁
情况二:空闲连接数量 >= 容器设置最大连接数量
- 对当前线程进行上锁
- 从活跃连接池中移除,并查看当前连接池是否可以,是否能ping通
- 进行回滚操作
- 把当前连接设置为不可用
- 释放锁
今天母亲节,祝愿天下所有母亲节日快乐,身体安康!!!