Druid连接池中的驱逐空闲线程流程主要涉及以下几个步骤:
1. 创建和启动驱逐线程
createAndStartDestroyThread
方法:此方法负责创建和启动负责驱逐空闲连接的线程。如果提供了destroyScheduler
(一个调度器),则使用该调度器定期执行DestroyTask
任务;如果没有提供,则创建一个DestroyConnectionThread
线程并启动。
2. 驱逐线程的运行逻辑 (DestroyConnectionThread
)
run
方法:驱逐线程的主要逻辑在这里实现。- 初始时,调用
initedLatch.countDown()
表示线程已初始化。 - 进入无限循环,根据
timeBetweenEvictionRunsMillis
的值决定休眠时间,如果没有设置则默认休眠1000毫秒。 - 检查线程是否被中断,如果是,则退出循环。
- 调用
destoryTask.run()
执行实际的驱逐任务。
- 初始时,调用
3. 驱逐任务的执行逻辑 (DestroyTask
)
run
方法:执行驱逐任务。- 调用
shrink(true)
方法检查并驱逐空闲连接。 - 如果启用了移除被遗弃的连接,则调用
removeAbandoned()
方法处理。
- 调用
4. 执行 shrink
方法
- 参数:
shrink
方法接受两个参数,checkTime
和keepAlive
,分别用于控制是否检查连接的空闲时间以及是否执行连接的保活检测。- 锁定:首先尝试获取锁,如果被中断则返回。
- 初始化变量:初始化一些变量,如
evictCount
(待驱逐连接数)和keepAliveCount
(保活连接数)。 - 遍历连接:遍历连接池中的所有连接,根据
checkTime
参数决定是否检查连接的空闲时间。- 物理超时:如果连接的物理超时时间超过
phyTimeoutMillis
,则将连接添加到待驱逐列表。 - 空闲时间检查:如果连接的空闲时间超过
minEvictableIdleTimeMillis
或小于maxEvictableIdleTimeMillis
,则将连接添加到待驱逐列表。 - 保活检查:如果
keepAlive
为true
且连接的空闲时间超过keepAliveBetweenTimeMillis
,则将连接添加到保活列表。
- 物理超时:如果连接的物理超时时间超过
- 更新连接池:如果待驱逐或保活列表中有连接,则更新连接池数组,移除待驱逐的连接,并补充新的连接以保持连接池的大小。
- 释放锁:最后释放锁。
5. 驱逐空闲连接
- 执行
evictConnections
:遍历待驱逐连接列表,关闭连接并更新统计信息。
6. 执行保活检查
- 执行
keepAliveConnections
:遍历保活连接列表,对每个连接执行保活检查。- 验证连接:尝试验证连接的有效性,如果连接无效,则标记为丢弃。
- 重新放入连接池:如果连接有效,则尝试将其重新放回连接池中。
- 丢弃连接:如果连接被标记为丢弃,则关闭连接并更新统计信息。
7. 补充连接池
needFill
检查:如果连接池中的连接数少于minIdle
,则尝试补充新的连接。
通过以上步骤,Druid连接池能够有效地管理空闲连接,确保连接池的健康和高效运行。
##源码
public void shrink(boolean checkTime, boolean keepAlive) {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}
boolean needFill = false;
int evictCount = 0;
int keepAliveCount = 0;
int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
fatalErrorCountLastShrink = fatalErrorCount;
try {
if (!inited) {
return;
}
final int checkCount = poolingCount - minIdle;
final long currentTimeMillis = System.currentTimeMillis();
for (int i = 0; i < poolingCount; ++i) {
DruidConnectionHolder connection = connections[i];
if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
keepAliveConnections[keepAliveCount++] = connection;
continue;
}
if (checkTime) {
if (phyTimeoutMillis > 0) {
long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis
) {
break;
}
if (idleMillis >= minEvictableIdleTimeMillis) {
if (checkTime && i < checkCount) {
evictConnections[evictCount++] = connection;
continue;
} else if (idleMillis > maxEvictableIdleTimeMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
keepAliveConnections[keepAliveCount++] = connection;
}
} else {
if (i < checkCount) {
evictConnections[evictCount++] = connection;
} else {
break;
}
}
}
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
poolingCount -= removeCount;
}
keepAliveCheckCount += keepAliveCount;
if (keepAlive && poolingCount + activeCount < minIdle) {
needFill = true;
}
} finally {
lock.unlock();
}
if (evictCount > 0) {
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
}
Arrays.fill(evictConnections, null);
}
if (keepAliveCount > 0) {
// keep order
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();
boolean validate = false;
try {
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
// skip
}
boolean discard = !validate;
if (validate) {
holer.lastKeepTimeMillis = System.currentTimeMillis();
boolean putOk = put(holer, 0L);
if (!putOk) {
discard = true;
}
}
if (discard) {
try {
connection.close();
} catch (Exception e) {
// skip
}
lock.lock();
try {
discardCount++;
if (activeCount + poolingCount <= minIdle) {
emptySignal();
}
} finally {
lock.unlock();
}
}
}
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
Arrays.fill(keepAliveConnections, null);
}
if (needFill) {
lock.lock();
try {
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
for (int i = 0; i < fillCount; ++i) {
emptySignal();
}
} finally {
lock.unlock();
}
} else if (onFatalError || fatalErrorIncrement > 0) {
lock.lock();
try {
emptySignal();
} finally {
lock.unlock();
}
}
}
public class DestroyConnectionThread extends Thread {
public DestroyConnectionThread(String name){
super(name);
this.setDaemon(true);
}
public void run() {
initedLatch.countDown();
for (;;) {
// 从前面开始删除
try {
if (closed) {
break;
}
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000); //
}
if (Thread.interrupted()) {
break;
}
destoryTask.run();
} catch (InterruptedException e) {
break;
}
}
}
}
public class DestroyTask implements Runnable {
@Override
public void run() {
shrink(true);
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
}
public int removeAbandoned() {
int removeCount = 0;
long currrentNanos = System.nanoTime();
List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>();
synchronized (activeConnections) {
Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();
for (; iter.hasNext();) {
DruidPooledConnection pooledConnection = iter.next();
if (pooledConnection.isRunning()) {
continue;
}
long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);
if (timeMillis >= removeAbandonedTimeoutMillis) {
iter.remove();
pooledConnection.setTraceEnable(false);
abandonedList.add(pooledConnection);
}
}
}
if (abandonedList.size() > 0) {
for (DruidPooledConnection pooledConnection : abandonedList) {
synchronized (pooledConnection) {
if (pooledConnection.isDisable()) {
continue;
}
}
JdbcUtils.close(pooledConnection);
pooledConnection.abandond();
removeAbandonedCount++;
removeCount++;
if (isLogAbandoned()) {
StringBuilder buf = new StringBuilder();
buf.append("abandon connection, open stackTrace\n");
StackTraceElement[] trace = pooledConnection.getConnectStackTrace();
for (int i = 0; i < trace.length; i++) {
buf.append("\tat ");
buf.append(trace[i].toString());
buf.append("\n");
}
LOG.error(buf.toString());
}
}
}
return removeCount;
}
protected void createAndStartDestroyThread() {
destoryTask = new DestroyTask();
if (destroyScheduler != null) {
long period = timeBetweenEvictionRunsMillis;
if (period <= 0) {
period = 1000;
}
destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destoryTask, period, period,
TimeUnit.MILLISECONDS);
initedLatch.countDown();
return;
}
String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
destroyConnectionThread = new DestroyConnectionThread(threadName);
destroyConnectionThread.start();
}