由于个人能力有限,本文章仅仅代表本人想法,若有不对请即时指出,若有侵权,请联系本人。
1 背景
工作中引入druid来管理数据源连接,由于数据源每隔10分钟强制管理空闲超过10分钟的连接,导致每隔10分钟出现1次获取不到有效连接异常。业务请求量非常少(1h可能来一次请求)。因此,研究了一下druid源码,以及相应的解决方案。
(1)设置maxEvictableIdleTimeMillis为300000,这样5分钟之后强制剔除空闲超过5分钟的连接。
新来的请求重新建立新的连接。
优点: 适合定时任务或者请求量特别特别少的业务场景
(2)保活
keepAlive: true
keepAliveBetweenTimeMillis: 120000
优点: 持续保存有效连接,及时响应业务请求
缺点: 持有成本
2 技术实战
2.1 druid引入以及默认配置
引入maven
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.23</version>
</dependency>
// spi融入到springboot框架
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfig
ure
@Configuration
@ConditionalOnProperty(name = "spring.datasource.type",
havingValue = "com.alibaba.druid.pool.DruidDataSource",
matchIfMissing = true)
@ConditionalOnClass(DruidDataSource.class)
@AutoConfigureBefore(DataSourceAutoConfiguration.class)
@EnableConfigurationProperties({DruidStatProperties.class, DataSourceProperties.class})
@Import({DruidSpringAopConfiguration.class,
DruidStatViewServletConfiguration.class,
DruidWebStatFilterConfiguration.class,
DruidFilterConfiguration.class})
public class DruidDataSourceAutoConfigure {
private static final Logger LOGGER = LoggerFactory.getLogger(DruidDataSourceAutoConfigure.class);
@Bean
@ConditionalOnMissingBean({DruidDataSourceWrapper.class,
DruidDataSource.class,
DataSource.class})
public DruidDataSourceWrapper dataSource() {
LOGGER.info("Init DruidDataSource");
return new DruidDataSourceWrapper();
}
}
@ConfigurationProperties("spring.datasource.druid")
public class DruidDataSourceWrapper extends DruidDataSource implements InitializingBean {
xxx
}
// 查看DruidAbstractDataSource类的属性
// 默认初始化连接池=0
public static final int DEFAULT_INITIAL_SIZE = 0;
// 默认最大连接池=6
public static final int DEFAULT_MAX_ACTIVE_SIZE = 8;
// 默认最大的空闲连接池=8
public static final int DEFAULT_MAX_IDLE = 8;
// 默认最小的空闲连接池=0
public static final int DEFAULT_MIN_IDLE = 0;
// 默认最长的获取连接等待时间-1
public static final int DEFAULT_MAX_WAIT = -1;
// 默认validation_query=null
public static final String DEFAULT_VALIDATION_QUERY = null;
// 默认当应用向连接池申请连接时,连接池不判断这条连接是否是可用的。
public static final boolean DEFAULT_TEST_ON_BORROW = false;
// 默认当一个连接使用完归还到连接池时不进行验证
public static final boolean DEFAULT_TEST_ON_RETURN = false;
// 默认进行空闲时检测
public static final boolean DEFAULT_WHILE_IDLE = true;
// 默认检查空闲连接的频率 1min
public static final long DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS = 60 * 1000L;
// 默认连接出错后重试时间间隔 0.5s
public static final long DEFAULT_TIME_BETWEEN_CONNECT_ERROR_MILLIS = 500;
public static final int DEFAULT_NUM_TESTS_PER_EVICTION_RUN = 3;
public static final int DEFAULT_TIME_CONNECT_TIMEOUT_MILLIS = 10_000;
// 默认连接超时时间10s
public static final int DEFAULT_TIME_SOCKET_TIMEOUT_MILLIS = 10_000;
// 默认剔除空闲连接最小的等待时间
public static final long DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS = 1000L * 60L * 30L;
// 默认剔除空闲连接最大的等待时间
public static final long DEFAULT_MAX_EVICTABLE_IDLE_TIME_MILLIS = 1000L * 60L * 60L * 7;
// 默认物理连接超时时间
public static final long DEFAULT_PHY_TIMEOUT_MILLIS = -1;
// 默认自动提交事务
protected volatile boolean defaultAutoCommit = true;
2.2 项目初始化执行
@Bean
@ConditionalOnMissingBean({DruidDataSourceWrapper.class,
DruidDataSource.class,
DataSource.class})
public DruidDataSourceWrapper dataSource() {
LOGGER.info("Init DruidDataSource");
return new DruidDataSourceWrapper();
}
public DruidDataSource() {
this(false);// 默认非公平锁
}
public DruidDataSource(boolean fairLock) {
super(fairLock);
// 接受从系统参数传递的配置
configFromPropeties(System.getProperties());
}
// 初始化非公平锁
public DruidAbstractDataSource(boolean lockFair) {
lock = new ReentrantLock(lockFair);
notEmpty = lock.newCondition();
empty = lock.newCondition();
}
@ConfigurationProperties("spring.datasource.druid")
public class DruidDataSourceWrapper extends DruidDataSource implements InitializingBean {
xxx
@Override
public void afterPropertiesSet() throws Exception {
xxx
init();//进行初始化,这时候会调用com.alibaba.druid.pool.DruidDataSource#init
}
xxx
}
public void init() throws SQLException {
if (inited) {
return;
}
// bug fixed for dead lock, for issue #2980
DruidDriver.getInstance();
final ReentrantLock lock = this.lock;
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}
boolean init = false;
try {
if (inited) {
return;
}
initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());
this.id = DruidDriver.createDataSourceId();
if (this.id > 1) {
long delta = (this.id - 1) * 100000;
connectionIdSeedUpdater.addAndGet(this, delta);
statementIdSeedUpdater.addAndGet(this, delta);
resultSetIdSeedUpdater.addAndGet(this, delta);
transactionIdSeedUpdater.addAndGet(this, delta);
}
if (this.jdbcUrl != null) {
this.jdbcUrl = this.jdbcUrl.trim();
initFromWrapDriverUrl();
}
initTimeoutsFromUrlOrProperties();
for (Filter filter : filters) {
filter.init(this);
}
if (this.dbTypeName == null || this.dbTypeName.length() == 0) {
this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
}
DbType dbType = DbType.of(this.dbTypeName);
if (JdbcUtils.isMysqlDbType(dbType)) {
boolean cacheServerConfigurationSet = false;
if (this.connectProperties.containsKey("cacheServerConfiguration")) {
cacheServerConfigurationSet = true;
} else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
cacheServerConfigurationSet = true;
}
if (cacheServerConfigurationSet) {
this.connectProperties.put("cacheServerConfiguration", "true");
}
}
if (maxActive <= 0) {
throw new IllegalArgumentException("illegal maxActive " + maxActive);
}
if (maxActive < minIdle) {
throw new IllegalArgumentException("illegal maxActive " + maxActive);
}
if (getInitialSize() > maxActive) {
throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActive " + maxActive);
}
if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) {
throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");
}
if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) {
throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis");
}
if (keepAlive && keepAliveBetweenTimeMillis <= timeBetweenEvictionRunsMillis) {
throw new SQLException("keepAliveBetweenTimeMillis must be greater than timeBetweenEvictionRunsMillis");
}
if (this.driverClass != null) {
this.driverClass = driverClass.trim();
}
initFromSPIServiceLoader();
resolveDriver();
initCheck();
this.netTimeoutExecutor = new SynchronousExecutor();
initExceptionSorter();
initValidConnectionChecker();
validationQueryCheck();
if (isUseGlobalDataSourceStat()) {
dataSourceStat = JdbcDataSourceStat.getGlobal();
if (dataSourceStat == null) {
dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbTypeName);
JdbcDataSourceStat.setGlobal(dataSourceStat);
}
if (dataSourceStat.getDbType() == null) {
dataSourceStat.setDbType(this.dbTypeName);
}
} else {
dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbTypeName, this.connectProperties);
}
dataSourceStat.setResetStatEnable(this.resetStatEnable);
connections = new DruidConnectionHolder[maxActive];
evictConnections = new DruidConnectionHolder[maxActive];
keepAliveConnections = new DruidConnectionHolder[maxActive];
nullConnections = new DruidConnectionHolder[maxActive];
SQLException connectError = null;
if (createScheduler != null && asyncInit) {
for (int i = 0; i < initialSize; ++i) {
submitCreateTask(true);
}
} else if (!asyncInit) {
// init connections
while (poolingCount < initialSize) {
try {
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount++] = holder;
} catch (SQLException ex) {
LOG.error("init datasource error, url: " + this.getUrl(), ex);
if (initExceptionThrow) {
connectError = ex;
break;
} else {
Thread.sleep(3000);
}
}
}
if (poolingCount > 0) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
}
createAndLogThread();
createAndStartCreatorThread();
createAndStartDestroyThread();
// await threads initedLatch to support dataSource restart.
if (createConnectionThread != null) {
createConnectionThread.getInitedLatch().await();
}
if (destroyConnectionThread != null) {
destroyConnectionThread.getInitedLatch().await();
}
init = true;
initedTime = new Date();
registerMbean();
if (connectError != null && poolingCount == 0) {
throw connectError;
}
if (keepAlive) {
if (createScheduler != null) {
// async fill to minIdle
for (int i = 0; i < minIdle - initialSize; ++i) {
submitCreateTask(true);
}
} else {
empty.signal();
}
}
} catch (SQLException e) {
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} catch (InterruptedException e) {
throw new SQLException(e.getMessage(), e);
} catch (RuntimeException e) {
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} catch (Error e) {
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} finally {
inited = true;
lock.unlock();
if (init && LOG.isInfoEnabled()) {
String msg = "{dataSource-" + this.getID();
if (this.name != null && !this.name.isEmpty()) {
msg += ",";
msg += this.name;
}
msg += "} inited";
LOG.info(msg);
}
}
}
2.3 执行回收空闲连接
public class DestroyConnectionThread extends Thread {
xxx
public void run() {
initedLatch.countDown();
for (; !Thread.currentThread().isInterrupted(); ) {
// 从前面开始删除
try { // 若closed 为true,直接break停止执行
if (closed || closing) {
break;
}
// 每隔timeBetweenEvictionRunsMillis 执行一次
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {//每隔1s执行一次
Thread.sleep(1000); //
}
if (Thread.interrupted()) {
break;
}
destroyTask.run();
} catch (InterruptedException e) {
break;
}
}
}
}
public class DestroyTask implements Runnable {
public DestroyTask() {
}
@Override
public void run() {
// 执行回收空闲连接
shrink(true, keepAlive);
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
}
// checkTime为true, keepalive默认为false
public void shrink(boolean checkTime, boolean keepAlive) {
if (poolingCount == 0) {
return;
}
final Lock lock = this.lock;
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}
boolean needFill = false;
int evictCount = 0;
int keepAliveCount = 0;
int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
fatalErrorCountLastShrink = fatalErrorCount;
try {
if (!inited) {
return;
}
final int checkCount = poolingCount - minIdle;
final long currentTimeMillis = System.currentTimeMillis();
// remaining is the position of the next connection should be retained in the pool.
int remaining = 0;
int i = 0;
for (; i < poolingCount; ++i) {
DruidConnectionHolder connection = connections[i];
if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
keepAliveConnections[keepAliveCount++] = connection;
continue;
}
if (checkTime) {
if (phyTimeoutMillis > 0) {
long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis) {
break;
}
// 当空闲时间 > 最小空闲时间
if (idleMillis >= minEvictableIdleTimeMillis) {
if (i < checkCount) {
evictConnections[evictCount++] = connection;
continue;
// 当空闲时间 > 最大空闲时间
} else if (idleMillis > maxEvictableIdleTimeMillis) {
// 放到剔除空闲连接数组中,并且剔除数量+1
evictConnections[evictCount++] = connection;
continue;
}
}
// 若开启了保活,并且空闲连接 >= 保活间隔时间
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis
&& currentTimeMillis - connection.lastKeepTimeMillis >= keepAliveBetweenTimeMillis) {
keepAliveConnections[keepAliveCount++] = connection;
} else {
if (i != remaining) {
// move the connection to the new position for retaining it in the pool.
connections[remaining] = connection;
}
remaining++;
}
} else {
if (i < checkCount) {
evictConnections[evictCount++] = connection;
} else {
break;
}
}
}
// shrink connections by HotSpot intrinsic function _arraycopy for performance optimization.
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
int breakedCount = poolingCount - i;
if (breakedCount > 0) {
// retains the connections that start at the break position.
System.arraycopy(connections, i, connections, remaining, breakedCount);
remaining += breakedCount;
}
// clean the old references of the connections that have been moved forward to the new positions.
System.arraycopy(nullConnections, 0, connections, remaining, removeCount);
poolingCount -= removeCount;
}
keepAliveCheckCount += keepAliveCount;
if (keepAlive && poolingCount + activeCount < minIdle) {
needFill = true;
}
} finally {
lock.unlock();
}
if (evictCount > 0) {
// 遍历所有需要剔除的空闲连接数组,将连接进行释放
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
}
// use HotSpot intrinsic function _arraycopy for performance optimization.
System.arraycopy(nullConnections, 0, evictConnections, 0, evictConnections.length);
}
if (keepAliveCount > 0) {
// keep order
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holder = keepAliveConnections[i];
Connection connection = holder.getConnection();
holder.incrementKeepAliveCheckCount();
boolean validate = false;
try {
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
keepAliveCheckErrorLast = error;
keepAliveCheckErrorCountUpdater.incrementAndGet(this);
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
}
boolean discard = !validate;
if (validate) {
holder.lastKeepTimeMillis = System.currentTimeMillis();
boolean putOk = put(holder, 0L, true);
if (!putOk) {
discard = true;
}
}
if (discard) {
try {
connection.close();
} catch (Exception error) {
discardErrorLast = error;
discardErrorCountUpdater.incrementAndGet(DruidDataSource.this);
if (LOG.isErrorEnabled()) {
LOG.error("discard connection error", error);
}
}
if (holder.socket != null) {
try {
holder.socket.close();
} catch (Exception error) {
discardErrorLast = error;
discardErrorCountUpdater.incrementAndGet(DruidDataSource.this);
if (LOG.isErrorEnabled()) {
LOG.error("discard connection error", error);
}
}
}
lock.lock();
try {
holder.discard = true;
discardCount++;
if (activeCount + poolingCount + createTaskCount < minIdle) {
needFill = true;
}
} finally {
lock.unlock();
}
}
}
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
// use HotSpot intrinsic function _arraycopy for performance optimization.
System.arraycopy(nullConnections, 0, keepAliveConnections, 0, keepAliveConnections.length);
}
if (needFill) {
lock.lock();
try {
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
emptySignal(fillCount);
} finally {
lock.unlock();
}
} else if (fatalErrorIncrement > 0) {
lock.lock();
try {
emptySignal();
} finally {
lock.unlock();
}
}
}
2.3 全部收回连接后接受请求重新创建连接
@Override
public DruidPooledConnection getConnection() throws SQLException {
return getConnection(maxWait);
}
打开druid监控页面,可以观察连接池的连接数量变化。当没有请求后,再过maxEvictableIdleTimeMillis 时间,发现连接池的连接数=0
2.4 设置keepAlive=true
keepAlive: true
keepAliveBetweenTimeMillis: 120000
timeBetweenEvictionRunsMillis: 5000 #关闭空闲连接间隔 5s
minEvictableIdleTimeMillis: 120000 #连接保持空闲而不被驱逐的最小时间 2分钟
maxEvictableIdleTimeMillis: 420000 #连接保持空闲而不被驱逐的最大时间 5分钟
if (keepAlive && poolingCount + activeCount < minIdle) {
needFill = true; //需要重建物理连接,保持minIdle数量
}
登录druid控制台http://localhost:8080/druid/index.html
可以看到连接一直保持在minIdle量