解析ConcurrentHashMap: put方法源碼分析
上一章:預熱(內部一些小方法分析)
put()
方法是并發(fā)HashMap源碼分析的重點方法,這里涉及到并發(fā)擴容,桶位尋址等等…- JDK1.8 ConcurrentHashMap結構圖:
1、put方法源碼解析
// 向并發(fā)Map中put一個數據 public V put(K key, V value) { return putVal(key, value, false); } // 向并發(fā)Map中put一個數據 // Key: 數據的鍵 // value:數據的值 // onlyIfAbsent:是否替換數據: // 如果為false,則當put數據時,遇到Map中有相同K,V的數據,則將其替換 // 如果為true,則當put數據時,遇到Map中有相同K,V的數據,則不做替換,不插入 final V putVal(K key, V value, boolean onlyIfAbsent) { // 控制k 和 v 不能為null if (key == null || value == null) throw new NullPointerException(); // 通過spread方法,可以讓高位也能參與進尋址運算,使最終得到的hash值更分散 int hash = spread(key.hashCode()); // binCount表示當前k-v 封裝成node插入到指定桶位后,在桶位中的所屬鏈表的下標位置 // =0 表示當前桶位為null,node可以直接放入 // >0 表示當前桶位中有節(jié)點,遍歷鏈表,將封裝k-v的新node放入鏈表末尾,并記錄鏈表末尾的下標binCount // =2 表示當前桶位**可能**已經樹化為紅黑樹了 int binCount = 0; // tab 引用map對象的table // 自旋 for (Node<K,V>[] tab = table;;) { // f 表示桶位的頭結點 // n 表示散列表數組的長度 // i 表示key通過尋址計算后,得到的桶位下標 // fh 表示桶位頭結點的hash值 Node<K,V> f; int n, i, fh; // ----------------------------------------------------------------------------- // CASE1:成立,表示當前map中的table尚未初始化... if (tab == null || (n = tab.length) == 0) // 對map中的table進行初始化 tab = initTable(); // ----------------------------------------------------------------------------- // CASE2:table已經初始化,此時根據尋址算法確定一個桶,并且桶的頭結點f為null // i 表示key使用路由尋址算法得到key對應table數組的下標位置,tabAt方法獲取指定桶位i的頭結點f else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 這時候就可以將封裝k-v的結點直接放入桶 // casTabAt通過CAS的方式去向Node數組指定位置i設置節(jié)點值,設置成功返回true 否則返回false if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; } // ----------------------------------------------------------------------------- // CASE3:table已經初始化,尋址得到的桶位中的頭結點f不是null,如果該頭結點f的hash值fh=-1: // 則,表示當前節(jié)點是FWD(forwarding)節(jié)點 // 如果CASE3條件成立:表示當前桶位的頭結點為 FWD結點,表示目前map正處于擴容過程中.. else if ((fh = f.hash) == MOVED) // 發(fā)現(xiàn)f是FWD結點后,當前結點有義務去幫助當前map對象完成數據遷移工作... // 等學完擴容再來分析這里~ tab = helpTransfer(tab, f); // ----------------------------------------------------------------------------- // CASE4:CASE1-3都不滿足時,那么當前桶位存放的可能是鏈表也可能是紅黑樹代理結點TreeBin else { // 保留替換之前的數據引用:當新插入的key存在后,會將舊值賦給OldVal,返回給put方法調用 V oldVal = null; // 使用synchronized加鎖“頭節(jié)點”(**理論上是“頭結點”**) synchronized (f) { // ----------------------------------------------------------------------- // CASE5:tabAt(tab, i) == f // 對比一下當前桶位的頭節(jié)點是否為之前獲取的頭結點:為什么又要對比一下? // 如果其它線程在當前線程之前將該桶位的頭結點修改掉了,當前線程再使用sync對該節(jié)點f加鎖就有問題了(鎖本身加錯了地方~) if (tabAt(tab, i) == f) {// 如果條件成立,說明加鎖的對象f沒有問題,持有鎖! // ------------------------------------------------------------------ // CASE6:fh >= 0) // 如果條件成立,說明當前桶位就是普通鏈表桶位,這里回顧下常量屬性字段: // static final int MOVED = -1; 表示當前節(jié)點是FWD(forwarding)節(jié)點 // static final int TREEBIN = -2; 表示當前節(jié)點已經樹化 if (fh >= 0) { // 1.當前插入key與鏈表當中所有元素的key都不一致時,當前的插入操作是追加到鏈表的末尾,binCount此時表示鏈表長度 // 2.當前插入key與鏈表當中的某個元素的key一致時,當前插入操作可能就是替換了。binCount表示沖突位置(binCount - 1) binCount = 1; // 迭代循環(huán)當前桶位的鏈表,e是每次循環(huán)處理節(jié)點。 for (Node<K,V> e = f;; ++binCount) { // 當前循環(huán)遍歷節(jié)點的key K ek; // -------------------------------------------------------- // CASE7: // 條件一:e.hash == hash // 成立:表示循環(huán)的當前元素的hash值與插入節(jié)點的hash值一致,需要進一步判斷 // 條件二:((ek = e.key) == key ||(ek != null && key.equals(ek))) // 成立:說明循環(huán)的當前節(jié)點與插入節(jié)點的key一致,確實發(fā)生沖突了~ if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 將當前循環(huán)的元素的值賦值給oldVal oldVal = e.val; // 傳入的putVal方法參數onlyIfAbsent:是否替換數據: // false,當put數據時,遇到Map中有相同K,V的數據,則將其替換 // true,當put數據時,遇到Map中有相同K,V的數據,則不做替換,不插入 if (!onlyIfAbsent) e.val = value; break; } // -------------------------------------------------------- // CASE8: // 當前元素與插入元素的key不一致時,會走下面程序: // 1.更新循環(huán)遍歷的節(jié)點為當前節(jié)點的下一個節(jié)點 // 2.判斷下一個節(jié)點是否為null,如果是null,說明當前節(jié)點已經是隊尾了,插入數據需要追加到隊尾節(jié)點的后面。 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // ------------------------------------------------------------------ // CASE9:f instanceof TreeBin // 如果條件成立,表示當前桶位是紅黑樹代理結點TreeBin //(前置條件:該桶位一定不是鏈表) else if (f instanceof TreeBin) { // p表示紅黑樹中如果與你插入節(jié)點的key 有沖突節(jié)點的話,則putTreeVal方法會返回沖突節(jié)點的引用。 Node<K,V> p; // 強制設置binCount為2,因為binCount <= 1 時有其它含義,所以這里設置為了2 (回頭講addCount的時候會詳細介紹) binCount = 2; // 條件一成立,說明當前插入節(jié)點的key與紅黑樹中的某個節(jié)點的key一致,沖突了: // putTreeVal向紅黑樹中插入結點,插入成功返回null,否則返回沖突結點對象 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { // 將沖突節(jié)點的值賦值給oldVal oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // ------------------------------------------------------------------ // CASE10:binCount != 0 // 說明當前桶位不為null,可能是紅黑樹也可能是鏈表 if (binCount != 0) { // 如果binCount>=8 表示處理的桶位一定是鏈表 if (binCount >= TREEIFY_THRESHOLD) // 因為桶中鏈表長度大于了8,需要樹化: // 調用轉化鏈表為紅黑樹的方法 treeifyBin(tab, i); // 說明當前線程插入的數據key,與原有k-v發(fā)生沖突,需要將原數據v返回給調用者。 if (oldVal != null) return oldVal; break; } } } // Map中的元素數據量累加方法:有額外的以下功能 // 1.統(tǒng)計當前table一共有多少數據 // 2.并判斷是否達到擴容閾值標準,觸發(fā)擴容。 addCount(1L, binCount); return null; }
2、initTable方法源碼分析
第一次放元素時,初始化桶數組:
/** * table初始化 */ private final Node<K,V>[] initTable() { // tab: 引用map.table // sc: sizeCtl的臨時值 // sizeCtl:默認為0,用來控制table的狀態(tài)、以及初始化和擴容操作: // sizeCtl<0表示table的狀態(tài): //(1)=-1,表示有其他線程正在進行初始化操作。(其他線程就不能再進行初始化,相當于一把鎖) //(2)=-(1 + nThreads),表示有n個線程正在一起擴容。 // sizeCtl>=0表示table的初始化和擴容相關操作: //(3)=0,默認值,后續(xù)在真正初始化table的時候使用,設置為默認容量DEFAULT_CAPACITY --> 16。 //(4)>0,將sizeCtl設置為table初始容量或擴容完成后的下一次擴容的門檻。 Node<K,V>[] tab; int sc; // 附加條件的自旋: 條件是map.table尚未初始化... while ((tab = table) == null || tab.length == 0) { // ----------------------------------------------------------------------------- // CASE1: sizeCtl) < 0 // sizeCtl < 0可能是以下2種情況: //(1)-1,表示有其他線程正在進行table初始化操作。 //(2)-(1 + nThreads),表示有n個線程正在一起擴容。 if ((sc = sizeCtl) < 0) // 這里sizeCtl大概率就是-1,表示其它線程正在進行創(chuàng)建table的過程,當前線程沒有競爭到初始化table的鎖,進而當前線程被迫等待... Thread.yield(); // ----------------------------------------------------------------------------- // CASE2:sizeCtl) >= 0 且U.compareAndSwapInt(this, SIZECTL, sc, -1)結果為true // U.compareAndSwapInt(this, SIZECTL, sc, -1):以CAS的方式修改當前線程的sizeCtl為-1, // sizeCtl如果成功被修改為-1,就返回true,否則返回false。 // 當返回true時,則該線程就可以進入下面的else if代碼塊中,這時候sizeCtl=-1相當于是一把鎖,表示下面的else if代碼塊已經被該線程占用,其他線程不能再進入~ else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { // ---------------------------------------------------------------------- // CASE3: 這里為什么又要判斷呢? // 為了防止其它線程已經初始化table完畢了,然后當前線程再次對其初始化..導致丟失數據。 // 如果條件成立,說明其它線程都沒有進入過這個if塊,當前線程就是具備初始化table權利了。 if ((tab = table) == null || tab.length == 0) { // sc大于等于0的情況如下: //(3)=0,默認值,后續(xù)在真正初始化table的時候使用,設置為默認容量DEFAULT_CAPACITY --> 16。 //(4)>0,將sizeCtl設置為table初始容量或擴容完成后的下一次擴容的門檻。 // 如果sc大于0,則創(chuàng)建table時使用sc為指定table初始容量大小, // 否則使用16默認值DEFAULT_CAPACITY int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") // 創(chuàng)建新數組nt Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 將新數組nt賦值給table、tab table = tab = nt; // sc設置為下次散列表擴容的門檻:0.75n sc = n - (n >>> 2); } } finally { // 將sc賦值給sizeCtl,分為一下2種情況: // 1、當前線程既通過了CASE2的判斷,也通過了CASE3的判斷: // 則,當前線程是第一次創(chuàng)建map.table的線程,那么,sc就表示下一次擴容的閾值。 // 2、當線程通過了CASE2的判斷,但是沒有通過CASE3的判斷: // 則,當前線程并不是第一次創(chuàng)建map.table的線程,當前線程通過CASE2的判斷時,將 // sizeCtl設置為了-1 ,那么在該線程結束上面的代碼邏輯之后,需要將sc修改回進入CASE2之前的sc值。 sizeCtl = sc; } break; } } return tab; }
(1)使用CAS鎖控制只有一個線程初始化桶數組;
(2)sizeCtl在初始化后存儲的是擴容門檻;
(3)擴容門檻寫死的是桶數組大小的0.75倍,桶數組大小即map的容量,也就是最多存儲多少個元素。
3、addCount方法(難點)
在閱讀addCount
方法源碼之前,最好再去熟悉下LongAdder
源碼:一篇帶你解析入門LongAdder源碼
addCount
方法作用:每次添加元素后,元素數量加1,并判斷是否達到擴容門檻,達到了則進行擴容或協(xié)助擴容。
/** * Map中的元素數據量累加方法:有額外的以下功能 * 1.統(tǒng)計當前table一共有多少數據 * 2.并判斷是否達到擴容閾值標準,觸發(fā)擴容 */ private final void addCount(long x, int check) { // as 表示 LongAdder.cells // b 表示LongAdder.base // s 表示當前map.table中元素的數量 CounterCell[] as; long b, s; // -------------------------統(tǒng)計當前table一共有多少數據---------------------------------- // CASE1: // (as = counterCells) != null // 條件一:true->表示cells已經初始化了,當前線程應該去使用hash尋址找到合適的cell 去累加數據 // false->表示當前線程應該直接將數據累加到base(沒有線程競爭) // !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) // 條件二:false->表示寫base成功,數據累加到base中了,當前競爭不激烈,不需要創(chuàng)建cells // true->表示寫base失敗,與其他線程在base上發(fā)生了競爭,當前線程應該去嘗試創(chuàng)建cells。 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { // 有幾種情況進入到if塊中? // 條件一為true->表示cells已經初始化了,當前線程應該去使用hash尋址找到合適的cell去累加數據 // 條件二為true->表示寫base失敗,與其他線程在base上發(fā)生了競爭,當前線程應該去嘗試創(chuàng)建cells。 // a 表示當前線程hash尋址命中的cell CounterCell a; // v 表示當前線程寫cell時的期望值 long v; // m 表示當前cells數組的長度 int m; // true -> 未發(fā)生線程競爭 false->發(fā)生線程競爭 boolean uncontended = true; // --------------------------------------------------------------------------- // CASE2: // 條件一:as == null || (m = as.length - 1) < 0 // true-> 表示當前線程是通過寫base競爭失敗(CASE1的條件二),然后進入的if塊,就需要調用fullAddCount方法去擴容或者重試.. // (fullAddCount方法就類似于LongAdder.longAccumulate方法) // 條件二:a = as[ThreadLocalRandom.getProbe() & m]) == null 前置條件:cells已經初始化了 // true->表示當前線程命中的cell表格是個空的,需要當前線程進入fullAddCount方法去初始化cell,放入當前位置. // 條件三:!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x) // 前置條件:條件二中當前線程命中的cell表格不是空的~ // false->取反得到false,表示當前線程使用cas方式更新當前命中的cell成功 // true->取反得到true,表示當前線程使用cas方式更新當前命中的cell失敗,需要進入fullAddCount進行重試或者擴容cells。 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) ) { fullAddCount(x, uncontended); // 考慮到fullAddCount里面的事情比較累,就讓當前線程不參與到擴容相關的邏輯了,直接返回到調用點。 return; } if (check <= 1) return; // sumCount統(tǒng)計當前散列表元素個數sum = base + cells[0] + ... + cells[n],這是一個期望值 s = sumCount(); } // -------------------------判斷是否達到擴容閾值標準,觸發(fā)擴容---------------------------- // CASE3: // check >= 0表示一定是一個put操作調用的addCount,check < 0是remove操作調用的addCount方法 if (check >= 0) { // tab 表示 map.table // nt 表示 map.nextTable // n 表示map.table數組的長度 // sc 表示sizeCtl的臨時值 Node<K,V>[] tab, nt; int n, sc; /** * sizeCtl < 0 * 1. -1 表示當前table正在初始化(有線程在創(chuàng)建table數組),當前線程需要自旋等待.. * 2.表示當前table數組正在進行擴容 ,高16位表示:擴容的標識戳 低16位表示:(1 + nThread) 當前參與并發(fā)擴容的線程數量 * * sizeCtl = 0,表示創(chuàng)建table數組時 使用DEFAULT_CAPACITY為大小 * * sizeCtl > 0 * * 1. 如果table未初始化,表示初始化大小 * 2. 如果table已經初始化,表示下次擴容時的 觸發(fā)條件(閾值) */ // 有條件自旋~ // 條件一:s >= (long)(sc = sizeCtl) // true -> 1.當前sizeCtl為一個負數 表示正在擴容中.. // 2.當前sizeCtl是一個正數,表示擴容閾值,但是s已經達到擴容閾值(需要擴容) // false -> 表示當前table尚未達到擴容閾值條件(不需要擴容) // 條件二:(tab = table) != null 恒成立 true // 條件三:(n = tab.length) < MAXIMUM_CAPACITY // true -> 當前table長度小于最大值限制,則可以進行擴容。 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { // 獲取擴容唯一標識戳 // eg: 16 -> 32 擴容標識戳為:1000 0000 0001 1011 // 什么意思呢? // 即,所有將map容量由16擴容到32的線程,其拿到的擴容唯一標識戳都是1000 0000 0001 1011 int rs = resizeStamp(n); // -------------------------------------------------------------------------- // CASE4: // 條件成立:表示當前table正在擴容,則,當前線程理論上應該協(xié)助table完成擴容 if (sc < 0) { // -------------------------------------------------------------- // CASE2: 條件1~4只要有個為true就會跳出循環(huán),不會繼續(xù)擴容~ // 條件一:(sc >>> RESIZE_STAMP_SHIFT) != rs // true -> 說明當前線程獲取到的擴容唯一標識戳 非 本批次擴容 // false -> 說明當前線程獲取到的擴容唯一標識戳 是 本批次擴容 // 條件二:JDK1.8 中有bug,jira已經提出來了,其實想表達的是 sc == (rs << 16 ) + 1 // true -> 表示擴容完畢,當前線程不需要再參與進來了 // false -> 擴容還在進行中,當前線程可以參與 // 條件三:JDK1.8 中有bug,jira已經提出來了,其實想表達的是: // sc == (rs << 16) + MAX_RESIZERS // true-> 表示當前參與并發(fā)擴容的線程達到了最大值 65535 - 1 // false->表示當前線程可以參與進來 //條件四:(nt = nextTable) == null // true -> 表示本次擴容結束 // false -> 擴容正在進行中 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) // 條件1~4只要有個為true就會跳出循環(huán),不會繼續(xù)擴容~ break; // -------------------------------------------------------------- // CASE5: // 前置條件:當前table正在執(zhí)行擴容中(即,CASE4沒有被通過)當前線程有機會參與進擴容。 // 條件成立:說明當前線程成功參與到擴容任務中,并且將sc低16位值加1,表示多了一個線程參與工作~ // 條件失?。? // 1.當前有很多線程都在此處嘗試修改sizeCtl,有其它一個線程修改成功了 // 導致你的sc期望值與內存中的值不一致,CAS修改失敗。(下次自選大概率不會在來到這里~) // 2.transfer任務內部的線程也修改了sizeCtl。 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 擴容(遷移數據):協(xié)助擴容線程,持有nextTable參數,進入該方法協(xié)助擴容~ transfer(tab, nt); } // -------------------------------------------------------------------------- // CASE6: 以CAS的方式去更新sc,更新成功返回true,否則返回false // 條件成立,說明當前線程是觸發(fā)擴容的**第一個**線程,在transfer方法需要做一些擴容準備工作: // rs << RESIZE_STAMP_SHIFT:將擴容唯一標識戳左移16位 eg: // 1000 0000 0001 1011 << 16 得到 1000 0000 0001 1011 0000 0000 0000 0000 // 緊接這 (rs << RESIZE_STAMP_SHIFT) + 2)操作: // 1000 0000 0001 1011 0000 0000 0000 0000 + 2 // => 1000 0000 0001 1011 0000 0000 0000 0010,這個二進制數有如下含義: // sizeCtl的高16位: 1000 0000 0001 1011 表示當前的擴容標識戳 // sizeCtl的低16位: 0000 0000 0000 0010 表示(1 + nThread),即,目前有多少個線程正在參與擴容~ // 那么這里的n是怎么來的呢? // eg: (rs << RESIZE_STAMP_SHIFT) + 2 這里的2,就是1 + 1,后面的1就是對1*Thread,即(1+1*Threads) else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 擴容(遷移數據):觸發(fā)擴容條件的線程 不持有nextTable transfer(tab, null); // 重新計算table中的元素個數 s = sumCount(); } } }
4、總結
(1)元素個數的存儲方式類似于LongAdder類,存儲在不同的段上,減少不同線程同時更新size時的沖突;
(2)計算元素個數時把這些段的值及baseCount相加算出總的元素個數;
(3)正常情況下sizeCtl存儲著擴容門檻,擴容門檻為容量的0.75倍;
(4)擴容時sizeCtl高位存儲擴容郵戳(resizeStamp),低位存儲擴容線程數加1(1+nThreads);
(5)其它線程添加元素后如果發(fā)現(xiàn)存在擴容,也會加入的擴容行列中來;
以上就是ConcurrentHashMap源碼的put存入數據的方法以及相關方法,由于transfer 遷移數據這個方法比較復雜,我們放在下一篇文章中單獨分析~也希望大家多多關注腳本之家的其他內容!
相關文章
MyBatis-Plus中MetaObjectHandler沒生效完美解決
在進行測試時發(fā)現(xiàn)配置的MyMetaObjectHandler并沒有生效,本文主要介紹了MyBatis-Plus中MetaObjectHandler沒生效完美解決,具有一定的參考價值,感興趣的可以了解一下2023-11-11java根據當前時間獲取yyyy-MM-dd?HH:mm:ss標準格式的時間代碼示例
在Java中可以使用java.time包中的LocalDateTime類和DateTimeFormatter類來獲取并格式化當前時間為yyyy-MM-dd?HH:mm:ss的格式,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2024-10-10@Transaction,@Async在同一個類中注解失效的原因分析及解決
這篇文章主要介紹了@Transaction,@Async在同一個類中注解失效的原因分析及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12