Druid之連接創(chuàng)建及銷毀示例詳解
前言
Druid是阿里開源的數(shù)據(jù)庫連接池,是阿里監(jiān)控系統(tǒng)Dragoon的副產(chǎn)品,提供了強(qiáng)大的可監(jiān)控性和基于Filter-Chain的可擴(kuò)展性。
本篇文章將對(duì)Druid數(shù)據(jù)庫連接池的連接創(chuàng)建和銷毀進(jìn)行分析。分析Druid數(shù)據(jù)庫連接池的源碼前,需要明確幾個(gè)概念。
- Druid數(shù)據(jù)庫連接池中可用的連接存放在一個(gè)數(shù)組connections中;
- Druid數(shù)據(jù)庫連接池做并發(fā)控制,主要靠一把可重入鎖以及和這把鎖關(guān)聯(lián)的兩個(gè)Condition對(duì)象;
public DruidAbstractDataSource(boolean lockFair) {
lock = new ReentrantLock(lockFair);
notEmpty = lock.newCondition();
empty = lock.newCondition();
}
- 連接池沒有可用連接時(shí),應(yīng)用線程會(huì)在notEmpty上等待,連接池已滿時(shí),生產(chǎn)連接的線程會(huì)在empty上等待;
- 對(duì)連接保活,就是每間隔一定時(shí)間,對(duì)達(dá)到了?;铋g隔周期的連接進(jìn)行有效性校驗(yàn),可以將無效連接銷毀,也可以防止連接長(zhǎng)時(shí)間不與數(shù)據(jù)庫服務(wù)端通信。
Druid版本:1.2.11
正文
一. DruidDataSource連接創(chuàng)建
DruidDataSource連接的創(chuàng)建由CreateConnectionThread線程完成,其run() 方法如下所示。
public void run() {
initedLatch.countDown();
long lastDiscardCount = 0;
int errorCount = 0;
for (; ; ) {
try {
lock.lockInterruptibly();
} catch (InterruptedException e2) {
break;
}
long discardCount = DruidDataSource.this.discardCount;
boolean discardChanged = discardCount - lastDiscardCount > 0;
lastDiscardCount = discardCount;
try {
// emptyWait為true表示生產(chǎn)連接線程需要等待,無需生產(chǎn)連接
boolean emptyWait = true;
// 發(fā)生了創(chuàng)建錯(cuò)誤,且池中已無連接,且丟棄連接的統(tǒng)計(jì)沒有改變
// 此時(shí)生產(chǎn)連接線程需要生產(chǎn)連接
if (createError != null
&& poolingCount == 0
&& !discardChanged) {
emptyWait = false;
}
if (emptyWait
&& asyncInit && createCount < initialSize) {
emptyWait = false;
}
if (emptyWait) {
// 池中已有連接數(shù)大于等于正在等待連接的應(yīng)用線程數(shù)
// 且當(dāng)前是非keepAlive場(chǎng)景
// 且當(dāng)前是非連續(xù)失敗
// 此時(shí)生產(chǎn)連接的線程在empty上等待
// keepAlive && activeCount + poolingCount < minIdle時(shí)會(huì)在shrink()方法中觸發(fā)emptySingal()來添加連接
// isFailContinuous()返回true表示連續(xù)失敗,即多次(默認(rèn)2次)創(chuàng)建物理連接失敗
if (poolingCount >= notEmptyWaitThreadCount
&& (!(keepAlive && activeCount + poolingCount < minIdle))
&& !isFailContinuous()
) {
empty.await();
}
// 防止創(chuàng)建超過maxActive數(shù)量的連接
if (activeCount + poolingCount >= maxActive) {
empty.await();
continue;
}
}
} catch (InterruptedException e) {
// 省略
} finally {
lock.unlock();
}
PhysicalConnectionInfo connection = null;
try {
connection = createPhysicalConnection();
} catch (SQLException e) {
LOG.error("create connection SQLException, url: " + jdbcUrl
+ ", errorCode " + e.getErrorCode()
+ ", state " + e.getSQLState(), e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts
&& timeBetweenConnectErrorMillis > 0) {
// 多次創(chuàng)建失敗
setFailContinuous(true);
// 如果配置了快速失敗,就喚醒所有在notEmpty上等待的應(yīng)用線程
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
if (breakAfterAcquireFailure) {
break;
}
try {
Thread.sleep(timeBetweenConnectErrorMillis);
} catch (InterruptedException interruptEx) {
break;
}
}
} catch (RuntimeException e) {
LOG.error("create connection RuntimeException", e);
setFailContinuous(true);
continue;
} catch (Error e) {
LOG.error("create connection Error", e);
setFailContinuous(true);
break;
}
if (connection == null) {
continue;
}
// 把連接添加到連接池
boolean result = put(connection);
if (!result) {
JdbcUtils.close(connection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}
errorCount = 0;
if (closing || closed) {
break;
}
}
}
CreateConnectionThread的run() 方法整體就是在一個(gè)死循環(huán)中不斷的等待,被喚醒,然后創(chuàng)建線程。當(dāng)一個(gè)物理連接被創(chuàng)建出來后,會(huì)調(diào)用DruidDataSource#put方法將其放到連接池connections中,put() 方法源碼如下所示。
protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {
DruidConnectionHolder holder = null;
try {
holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
} catch (SQLException ex) {
// 省略
return false;
}
return put(holder, physicalConnectionInfo.createTaskId, false);
}
private boolean put(DruidConnectionHolder holder,
long createTaskId, boolean checkExists) {
// 涉及到連接池中連接數(shù)量改變的操作,都需要加鎖
lock.lock();
try {
if (this.closing || this.closed) {
return false;
}
// 池中已有連接數(shù)已經(jīng)大于等于最大連接數(shù),則不再把連接加到連接池并直接返回false
if (poolingCount >= maxActive) {
if (createScheduler != null) {
clearCreateTask(createTaskId);
}
return false;
}
// 檢查重復(fù)添加
if (checkExists) {
for (int i = 0; i < poolingCount; i++) {
if (connections[i] == holder) {
return false;
}
}
}
// 連接放入連接池
connections[poolingCount] = holder;
// poolingCount++
incrementPoolingCount();
if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
// 喚醒在notEmpty上等待連接的應(yīng)用線程
notEmpty.signal();
notEmptySignalCount++;
if (createScheduler != null) {
clearCreateTask(createTaskId);
if (poolingCount + createTaskCount < notEmptyWaitThreadCount
&& activeCount + poolingCount + createTaskCount < maxActive) {
emptySignal();
}
}
} finally {
lock.unlock();
}
return true;
}
put() 方法會(huì)先將物理連接從PhysicalConnectionInfo中獲取出來并封裝成一個(gè)DruidConnectionHolder,DruidConnectionHolder就是Druid連接池中的連接。新添加的連接會(huì)存放在連接池?cái)?shù)組connections的poolingCount位置,然后poolingCount會(huì)加1,也就是poolingCount代表著連接池中可以獲取的連接的數(shù)量。
二. DruidDataSource連接銷毀
DruidDataSource連接的銷毀由DestroyConnectionThread線程完成,其run() 方法如下所示。
public void run() {
// run()方法只要執(zhí)行了,就調(diào)用initedLatch#countDown
initedLatch.countDown();
for (; ; ) {
// 每間隔timeBetweenEvictionRunsMillis執(zhí)行一次DestroyTask的run()方法
try {
if (closed || closing) {
break;
}
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000);
}
if (Thread.interrupted()) {
break;
}
// 執(zhí)行DestroyTask的run()方法來銷毀需要銷毀的連接
destroyTask.run();
} catch (InterruptedException e) {
break;
}
}
}
DestroyConnectionThread的run() 方法就是在一個(gè)死循環(huán)中每間隔timeBetweenEvictionRunsMillis的時(shí)間就執(zhí)行一次DestroyTask的run() 方法。DestroyTask#run方法實(shí)現(xiàn)如下所示。
public void run() {
// 根據(jù)一系列條件判斷并銷毀連接
shrink(true, keepAlive);
// RemoveAbandoned機(jī)制
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
在DestroyTask#run方法中會(huì)調(diào)用DruidDataSource#shrink方法來根據(jù)設(shè)定的條件來判斷出需要銷毀和?;畹倪B接。DruidDataSource#shrink方法如下所示。
// checkTime參數(shù)表示在將一個(gè)連接進(jìn)行銷毀前,是否需要判斷一下空閑時(shí)間
public void shrink(boolean checkTime, boolean keepAlive) {
// 加鎖
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}
// needFill = keepAlive && poolingCount + activeCount < minIdle
// needFill為true時(shí),會(huì)調(diào)用empty.signal()喚醒生產(chǎn)連接的線程來生產(chǎn)連接
boolean needFill = false;
// evictCount記錄需要銷毀的連接數(shù)
// keepAliveCount記錄需要?;畹倪B接數(shù)
int evictCount = 0;
int keepAliveCount = 0;
int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
fatalErrorCountLastShrink = fatalErrorCount;
try {
if (!inited) {
return;
}
// checkCount = 池中已有連接數(shù) - 最小空閑連接數(shù)
// 正常情況下,最多能夠?qū)⑶癱heckCount個(gè)連接進(jìn)行銷毀
final int checkCount = poolingCount - minIdle;
final long currentTimeMillis = System.currentTimeMillis();
// 正常情況下,需要遍歷池中所有連接
// 從前往后遍歷,i為數(shù)組索引
for (int i = 0; i < poolingCount; ++i) {
DruidConnectionHolder connection = connections[i];
// 如果發(fā)生了致命錯(cuò)誤(onFatalError == true)且致命錯(cuò)誤發(fā)生時(shí)間(lastFatalErrorTimeMillis)在連接建立時(shí)間之后
// 把連接加入到?;钸B接數(shù)組中
if ((onFatalError || fatalErrorIncrement > 0)
&& (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
keepAliveConnections[keepAliveCount++] = connection;
continue;
}
if (checkTime) {
// phyTimeoutMillis表示連接的物理存活超時(shí)時(shí)間,默認(rèn)值是-1
if (phyTimeoutMillis > 0) {
// phyConnectTimeMillis表示連接的物理存活時(shí)間
long phyConnectTimeMillis = currentTimeMillis
- connection.connectTimeMillis;
// 連接的物理存活時(shí)間大于phyTimeoutMillis,則將這個(gè)連接放入evictConnections數(shù)組
if (phyConnectTimeMillis > phyTimeoutMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
// idleMillis表示連接的空閑時(shí)間
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
// minEvictableIdleTimeMillis表示連接允許的最小空閑時(shí)間,默認(rèn)是30分鐘
// keepAliveBetweenTimeMillis表示?;铋g隔時(shí)間,默認(rèn)是2分鐘
// 如果連接的空閑時(shí)間小于minEvictableIdleTimeMillis且還小于keepAliveBetweenTimeMillis
// 則connections數(shù)組中當(dāng)前連接之后的連接都會(huì)滿足空閑時(shí)間小于minEvictableIdleTimeMillis且還小于keepAliveBetweenTimeMillis
// 此時(shí)跳出遍歷,不再檢查其余的連接
if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis
) {
break;
}
// 連接的空閑時(shí)間大于等于允許的最小空閑時(shí)間
if (idleMillis >= minEvictableIdleTimeMillis) {
if (checkTime && i < checkCount) {
// i < checkCount這個(gè)條件的理解如下:
// 每次shrink()方法執(zhí)行時(shí),connections數(shù)組中只有索引0到checkCount-1的連接才允許被銷毀
// 這樣才能保證銷毀完連接后,connections數(shù)組中至少還有minIdle個(gè)連接
evictConnections[evictCount++] = connection;
continue;
} else if (idleMillis > maxEvictableIdleTimeMillis) {
// 如果空閑時(shí)間過久,已經(jīng)大于了允許的最大空閑時(shí)間(默認(rèn)7小時(shí))
// 那么無論如何都要銷毀這個(gè)連接
evictConnections[evictCount++] = connection;
continue;
}
}
// 如果開啟了?;顧C(jī)制,且連接空閑時(shí)間大于等于了保活間隔時(shí)間
// 此時(shí)將連接加入到保活連接數(shù)組中
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
keepAliveConnections[keepAliveCount++] = connection;
}
} else {
// checkTime為false,那么前checkCount個(gè)連接直接進(jìn)行銷毀,不再判斷這些連接的空閑時(shí)間是否超過閾值
if (i < checkCount) {
evictConnections[evictCount++] = connection;
} else {
break;
}
}
}
// removeCount = 銷毀連接數(shù) + 保活連接數(shù)
// removeCount表示本次從connections數(shù)組中拿掉的連接數(shù)
// 注:一定是從前往后拿,正常情況下最后minIdle個(gè)連接是安全的
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
// [0, 1, 2, 3, 4, null, null, null] -> [3, 4, 2, 3, 4, null, null, null]
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
// [3, 4, 2, 3, 4, null, null, null] -> [3, 4, null, null, null, null, null, null, null]
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
// 更新池中連接數(shù)
poolingCount -= removeCount;
}
keepAliveCheckCount += keepAliveCount;
// 如果池中連接數(shù)加上活躍連接數(shù)(借出去的連接)小于最小空閑連接數(shù)
// 則將needFill設(shè)為true,后續(xù)需要喚醒生產(chǎn)連接的線程來生產(chǎn)連接
if (keepAlive && poolingCount + activeCount < minIdle) {
needFill = true;
}
} finally {
lock.unlock();
}
if (evictCount > 0) {
// 遍歷evictConnections數(shù)組,銷毀其中的連接
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
}
Arrays.fill(evictConnections, null);
}
if (keepAliveCount > 0) {
// 遍歷keepAliveConnections數(shù)組,對(duì)其中的連接做可用性校驗(yàn)
// 校驗(yàn)通過連接就放入connections數(shù)組,沒通過連接就銷毀
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();
boolean validate = false;
try {
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
}
boolean discard = !validate;
if (validate) {
holer.lastKeepTimeMillis = System.currentTimeMillis();
boolean putOk = put(holer, 0L, true);
if (!putOk) {
discard = true;
}
}
if (discard) {
try {
connection.close();
} catch (Exception e) {
}
lock.lock();
try {
discardCount++;
if (activeCount + poolingCount <= minIdle) {
emptySignal();
}
} finally {
lock.unlock();
}
}
}
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
Arrays.fill(keepAliveConnections, null);
}
// 如果needFill為true則喚醒生產(chǎn)連接的線程來生產(chǎn)連接
if (needFill) {
lock.lock();
try {
// 計(jì)算需要生產(chǎn)連接的個(gè)數(shù)
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
for (int i = 0; i < fillCount; ++i) {
emptySignal();
}
} finally {
lock.unlock();
}
} else if (onFatalError || fatalErrorIncrement > 0) {
lock.lock();
try {
emptySignal();
} finally {
lock.unlock();
}
}
}
在DruidDataSource#shrink方法中,核心邏輯是遍歷connections數(shù)組中的連接,并判斷這些連接是需要銷毀還是需要保活。通常情況下,connections數(shù)組中的前checkCount(checkCount = poolingCount - minIdle) 個(gè)連接是危險(xiǎn)的,因?yàn)檫@些連接只要滿足了:空閑時(shí)間 >= minEvictableIdleTimeMillis(允許的最小空閑時(shí)間),那么就需要被銷毀,而connections數(shù)組中的最后minIdle個(gè)連接是相對(duì)安全的,因?yàn)檫@些連接只有在滿足:空閑時(shí)間 > maxEvictableIdleTimeMillis(允許的最大空閑時(shí)間) 時(shí),才會(huì)被銷毀。這么判斷的原因,主要就是需要讓連接池里能夠保證至少有minIdle個(gè)空閑連接可以讓應(yīng)用線程獲取。
當(dāng)確定好了需要銷毀和需要保活的連接后,此時(shí)會(huì)先將connections數(shù)組清理,只保留安全的連接,這個(gè)過程示意圖如下。

最后,會(huì)遍歷evictConnections數(shù)組,銷毀數(shù)組中的連接,遍歷keepAliveConnections數(shù)組,對(duì)其中的每個(gè)連接做可用性校驗(yàn),如果校驗(yàn)可用,那么就重新放回connections數(shù)組,否則銷毀。
總結(jié)
連接的創(chuàng)建由一個(gè)叫做CreateConnectionThread的線程完成,整體流程就是在一個(gè)死循環(huán)中不斷的等待,被喚醒,然后創(chuàng)建連接。每一個(gè)被創(chuàng)建出來的物理連接java.sql.Connection會(huì)被封裝為一個(gè)DruidConnectionHolder,然后存放到connections數(shù)組中。
連接的銷毀由一個(gè)叫做DestroyConnectionThread的線程完成,核心邏輯是周期性的遍歷connections數(shù)組中的連接,并判斷這些連接是需要銷毀還是需要?;?,需要銷毀的連接最后會(huì)被物理銷毀,需要?;畹倪B接最后會(huì)進(jìn)行一次可用性校驗(yàn),如果校驗(yàn)不通過,則進(jìn)行物理銷毀。
以上就是Druid之連接創(chuàng)建及銷毀示例詳解的詳細(xì)內(nèi)容,更多關(guān)于Druid連接創(chuàng)建銷毀的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java中的引用和動(dòng)態(tài)代理的實(shí)現(xiàn)詳解
這篇文章主要介紹了Java中的引用和動(dòng)態(tài)代理的實(shí)現(xiàn)詳解,涉及Java中的引用類型,JVMGC的可達(dá)性分析,代理模式等相關(guān)內(nèi)容,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11
java多線程編程之使用thread類創(chuàng)建線程
在Java中創(chuàng)建線程有兩種方法:使用Thread類和使用Runnable接口。在使用Runnable接口時(shí)需要建立一個(gè)Thread實(shí)例2014-01-01
java web項(xiàng)目里ehcache.xml介紹
java web項(xiàng)目里ehcache.xml介紹,需要的朋友可以參考一下2013-03-03
mybatis中Oracle參數(shù)為NULL錯(cuò)誤問題及解決
這篇文章主要介紹了mybatis中Oracle參數(shù)為NULL錯(cuò)誤問題及解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12
使用Java和高德地圖API將經(jīng)緯度轉(zhuǎn)換為地理位置信息的步驟
這篇文章詳細(xì)介紹了如何將GPS坐標(biāo)轉(zhuǎn)換為人類可讀的地理位置,介紹了環(huán)境準(zhǔn)備、代碼實(shí)現(xiàn)、異常處理及優(yōu)化步驟,首先創(chuàng)建LocationFinder類,實(shí)現(xiàn)getLocationFromCoordinates方法,利用高德逆地理編碼API轉(zhuǎn)換坐標(biāo),文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-09-09
Java安全之Filter權(quán)限繞過的實(shí)現(xiàn)
在一些需要挖掘一些無條件RCE中,大部分類似于一些系統(tǒng)大部分地方都做了權(quán)限控制的,而這時(shí)候想要利用權(quán)限繞過就顯得格外重要,本文就介紹了如何實(shí)現(xiàn),一起來了解一下2021-05-05

