druid的keepalive機制源碼解析
DruidDataSource
public class DruidDataSource extends DruidAbstractDataSource implements DruidDataSourceMBean, ManagedDataSource, Referenceable, Closeable, Cloneable, ConnectionPoolDataSource, MBeanRegistration { private int keepAliveCheckCount = 0; private DruidConnectionHolder[] keepAliveConnections; private volatile boolean keepAlive = false; // from DruidAbstractDataSource protected volatile long keepAliveBetweenTimeMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS * 2; public static final long DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS = 60 * 1000L; public void init() throws SQLException { //...... if (keepAlive) { // async fill to minIdle if (createScheduler != null) { for (int i = 0; i < minIdle; ++i) { submitCreateTask(true); } } else { this.emptySignal(); } } //...... } }
DruidDataSource的init方法在keepAlive的時候觸發(fā)創(chuàng)建連接,當createScheduler不為null時(默認為null
)執(zhí)行submitCreateTask,否則執(zhí)行emptySignal
submitCreateTask
com/alibaba/druid/pool/DruidDataSource.java
private void submitCreateTask(boolean initTask) { createTaskCount++; CreateConnectionTask task = new CreateConnectionTask(initTask); if (createTasks == null) { createTasks = new long[8]; } boolean putted = false; for (int i = 0; i < createTasks.length; ++i) { if (createTasks[i] == 0) { createTasks[i] = task.taskId; putted = true; break; } } if (!putted) { long[] array = new long[createTasks.length * 3 / 2]; System.arraycopy(createTasks, 0, array, 0, createTasks.length); array[createTasks.length] = task.taskId; createTasks = array; } this.createSchedulerFuture = createScheduler.submit(task); }
submitCreateTask創(chuàng)建CreateConnectionTask,然后提交到createScheduler
CreateConnectionTask
com/alibaba/druid/pool/DruidDataSource.java
public class CreateConnectionTask implements Runnable { private int errorCount; private boolean initTask; private final long taskId; public CreateConnectionTask() { taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this); } public CreateConnectionTask(boolean initTask) { taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this); this.initTask = initTask; } @Override public void run() { runInternal(); } private void runInternal() { for (; ; ) { // addLast lock.lock(); try { if (closed || closing) { clearCreateTask(taskId); return; } boolean emptyWait = true; if (createError != null && poolingCount == 0) { emptyWait = false; } if (emptyWait) { // 必須存在線程等待,才創(chuàng)建連接 if (poolingCount >= notEmptyWaitThreadCount // && (!(keepAlive && activeCount + poolingCount < minIdle)) // 在keepAlive場景不能放棄創(chuàng)建 && (!initTask) // 線程池初始化時的任務不能放棄創(chuàng)建 && !isFailContinuous() // failContinuous時不能放棄創(chuàng)建,否則會無法創(chuàng)建線程 && !isOnFatalError() // onFatalError時不能放棄創(chuàng)建,否則會無法創(chuàng)建線程 ) { clearCreateTask(taskId); return; } // 防止創(chuàng)建超過maxActive數量的連接 if (activeCount + poolingCount >= maxActive) { clearCreateTask(taskId); return; } } } finally { lock.unlock(); } PhysicalConnectionInfo physicalConnection = null; try { physicalConnection = createPhysicalConnection(); } catch (OutOfMemoryError e) { LOG.error("create connection OutOfMemoryError, out memory. ", e); errorCount++; if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { // fail over retry attempts setFailContinuous(true); if (failFast) { lock.lock(); try { notEmpty.signalAll(); } finally { lock.unlock(); } } if (breakAfterAcquireFailure) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } return; } this.errorCount = 0; // reset errorCount if (closing || closed) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } return; } createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS); return; } } catch (SQLException e) { LOG.error("create connection SQLException, url: " + jdbcUrl, e); errorCount++; if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { // fail over retry attempts setFailContinuous(true); if (failFast) { lock.lock(); try { notEmpty.signalAll(); } finally { lock.unlock(); } } if (breakAfterAcquireFailure) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } return; } this.errorCount = 0; // reset errorCount if (closing || closed) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } return; } createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS); return; } } catch (RuntimeException e) { LOG.error("create connection RuntimeException", e); // unknow fatal exception setFailContinuous(true); continue; } catch (Error e) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } LOG.error("create connection Error", e); // unknow fatal exception setFailContinuous(true); break; } catch (Throwable e) { lock.lock(); try { clearCreateTask(taskId); } finally { lock.unlock(); } LOG.error("create connection unexecpted error.", e); break; } if (physicalConnection == null) { continue; } physicalConnection.createTaskId = taskId; boolean result = put(physicalConnection); if (!result) { JdbcUtils.close(physicalConnection.getPhysicalConnection()); LOG.info("put physical connection to pool failed."); } break; } } }
CreateConnectionTask主要是創(chuàng)建physicalConnection,然后放到connections中。在emptyWait為true的時候會根據條件執(zhí)行empty.await()
CreateConnectionThread
public class CreateConnectionThread extends Thread { public CreateConnectionThread(String name) { super(name); this.setDaemon(true); } public void run() { initedLatch.countDown(); long lastDiscardCount = 0; int errorCount = 0; for (; ; ) { // addLast try { lock.lockInterruptibly(); } catch (InterruptedException e2) { break; } long discardCount = DruidDataSource.this.discardCount; boolean discardChanged = discardCount - lastDiscardCount > 0; lastDiscardCount = discardCount; try { boolean emptyWait = true; if (createError != null && poolingCount == 0 && !discardChanged) { emptyWait = false; } if (emptyWait && asyncInit && createCount < initialSize) { emptyWait = false; } if (emptyWait) { // 必須存在線程等待,才創(chuàng)建連接 if (poolingCount >= notEmptyWaitThreadCount // && (!(keepAlive && activeCount + poolingCount < minIdle)) && !isFailContinuous() ) { empty.await(); } // 防止創(chuàng)建超過maxActive數量的連接 if (activeCount + poolingCount >= maxActive) { empty.await(); continue; } } } catch (InterruptedException e) { lastCreateError = e; lastErrorTimeMillis = System.currentTimeMillis(); if ((!closing) && (!closed)) { LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e); } break; } finally { lock.unlock(); } PhysicalConnectionInfo connection = null; try { connection = createPhysicalConnection(); } catch (SQLException e) { LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode() + ", state " + e.getSQLState(), e); errorCount++; if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { // fail over retry attempts setFailContinuous(true); if (failFast) { lock.lock(); try { notEmpty.signalAll(); } finally { lock.unlock(); } } if (breakAfterAcquireFailure) { break; } try { Thread.sleep(timeBetweenConnectErrorMillis); } catch (InterruptedException interruptEx) { break; } } } catch (RuntimeException e) { LOG.error("create connection RuntimeException", e); setFailContinuous(true); continue; } catch (Error e) { LOG.error("create connection Error", e); setFailContinuous(true); break; } if (connection == null) { continue; } boolean result = put(connection); if (!result) { JdbcUtils.close(connection.getPhysicalConnection()); LOG.info("put physical connection to pool failed."); } errorCount = 0; // reset errorCount if (closing || closed) { break; } } } }
CreateConnectionThread的邏輯與CreateConnectionTask有點類似,有不少重復的代碼,不像是同一個人寫的;CreateConnectionThread是在DruidDataSource的init方法中觸發(fā)createAndStartCreatorThread執(zhí)行的,看只執(zhí)行一次
shrink
public void shrink(boolean checkTime, boolean keepAlive) { try { lock.lockInterruptibly(); } catch (InterruptedException e) { return; } boolean needFill = false; int evictCount = 0; int keepAliveCount = 0; int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink; fatalErrorCountLastShrink = fatalErrorCount; try { if (!inited) { return; } final int checkCount = poolingCount - minIdle; final long currentTimeMillis = System.currentTimeMillis(); for (int i = 0; i < poolingCount; ++i) { DruidConnectionHolder connection = connections[i]; if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) { keepAliveConnections[keepAliveCount++] = connection; continue; } if (checkTime) { if (phyTimeoutMillis > 0) { long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis; if (phyConnectTimeMillis > phyTimeoutMillis) { evictConnections[evictCount++] = connection; continue; } } long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis; if (idleMillis < minEvictableIdleTimeMillis && idleMillis < keepAliveBetweenTimeMillis ) { break; } if (idleMillis >= minEvictableIdleTimeMillis) { if (checkTime && i < checkCount) { evictConnections[evictCount++] = connection; continue; } else if (idleMillis > maxEvictableIdleTimeMillis) { evictConnections[evictCount++] = connection; continue; } } if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) { keepAliveConnections[keepAliveCount++] = connection; } } else { if (i < checkCount) { evictConnections[evictCount++] = connection; } else { break; } } } int removeCount = evictCount + keepAliveCount; if (removeCount > 0) { System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount); Arrays.fill(connections, poolingCount - removeCount, poolingCount, null); poolingCount -= removeCount; } keepAliveCheckCount += keepAliveCount; if (keepAlive && poolingCount + activeCount < minIdle) { needFill = true; } } finally { lock.unlock(); } if (evictCount > 0) { for (int i = 0; i < evictCount; ++i) { DruidConnectionHolder item = evictConnections[i]; Connection connection = item.getConnection(); JdbcUtils.close(connection); destroyCountUpdater.incrementAndGet(this); } Arrays.fill(evictConnections, null); } if (keepAliveCount > 0) { // keep order for (int i = keepAliveCount - 1; i >= 0; --i) { DruidConnectionHolder holer = keepAliveConnections[i]; Connection connection = holer.getConnection(); holer.incrementKeepAliveCheckCount(); boolean validate = false; try { this.validateConnection(connection); validate = true; } catch (Throwable error) { if (LOG.isDebugEnabled()) { LOG.debug("keepAliveErr", error); } // skip } boolean discard = !validate; if (validate) { holer.lastKeepTimeMillis = System.currentTimeMillis(); boolean putOk = put(holer, 0L, true); if (!putOk) { discard = true; } } if (discard) { try { connection.close(); } catch (Exception e) { // skip } lock.lock(); try { discardCount++; if (activeCount + poolingCount <= minIdle) { emptySignal(); } } finally { lock.unlock(); } } } this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount); Arrays.fill(keepAliveConnections, null); } if (needFill) { lock.lock(); try { int fillCount = minIdle - (activeCount + poolingCount + createTaskCount); for (int i = 0; i < fillCount; ++i) { emptySignal(); } } finally { lock.unlock(); } } else if (onFatalError || fatalErrorIncrement > 0) { lock.lock(); try { emptySignal(); } finally { lock.unlock(); } } }
DestroyConnectionThread就是每隔timeBetweenEvictionRunsMillis執(zhí)行一下destroyTask,而DestroyTask的run方法主要是執(zhí)行shrink(true, keepAlive)
shrink方法會根據poolingCount遍歷connections,在checkTime為true時會根據idleMillis判斷是否需要evict,否則判斷是否需要keepalive(keepAlive && idleMillis >= keepAliveBetweenTimeMillis
),需要的話放入keepAliveConnections中,然后遍歷進行validateConnection,如果成功則更新lastKeepTimeMillis,否則執(zhí)行connection.close(),最后清空keepAliveConnections數組
小結
DestroyConnectionThread就是每隔timeBetweenEvictionRunsMillis執(zhí)行一下destroyTask,而DestroyTask的run方法主要是執(zhí)行shrink(true, keepAlive);該方法處理了evict及keepalive的邏輯,根據poolingCount遍歷connections,在checkTime為true時會根據idleMillis判斷是否需要evict,否則判斷是否需要keepalive(keepAlive && idleMillis >= keepAliveBetweenTimeMillis
),需要的話放入keepAliveConnections中,然后遍歷進行validateConnection,如果成功則更新lastKeepTimeMillis,否則執(zhí)行connection.close(),最后清空keepAliveConnections數組。
jedis的keepalive是直接設置socket.setKeepAlive(true),而common-pools則沒有所謂的keepalive,本質上druid的keepalive與common-pools的testWhileIdle類似;只不過druid直接在getConnection的時候執(zhí)行testWhileIdle,這個邏輯有點奇怪,如果移除掉,而在shrink方法里頭的keepAlive邏輯刪除keepAliveBetweenTimeMillis判斷,那么就跟common-pools的testWhileIdle的邏輯一致了。druid的keepalive相當于帶了keepAliveBetweenTimeMillis的testWhileIdle。
以上就是druid的keepalive機制源碼解析的詳細內容,更多關于druid keepalive機制的資料請關注腳本之家其它相關文章!
相關文章
idea新建mapper.xml文件詳細步驟如:mybatis-config
這篇文章主要介紹了idea新建xml模板設置,例如:mybatis-config,本文分步驟通過圖文并茂的形式給大家介紹的非常詳細,需要的朋友可以參考下2023-07-07淺析Java常用API(Scanner,Random)匿名對象
這篇文章主要介紹了Java常用API(Scanner,Random)匿名對象,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-03-03詳談Java編程之委托代理回調、內部類以及匿名內部類回調(閉包回調)
下面小編就為大家?guī)硪黄斦凧ava編程之委托代理回調、內部類以及匿名內部類回調(閉包回調)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-05-05