Java同步容器和并發(fā)容器詳解
同步容器
在 Java 中,同步容器主要包括 2 類:
- Vector、Stack、HashTableCollections 類中提供的靜態(tài)工廠方法創(chuàng)建的類(由 Collections.synchronizedXxxx 等方法)
- Collections類中提供的靜態(tài)工廠方法創(chuàng)建的類
Vector 實現(xiàn)了 List 接口,Vector 實際上就是一個數(shù)組,和 ArrayList 類似,但是Vector 中的方法都是 synchronized 方法,即進(jìn)行了同步措施。
Stack 也是一個同步容器,它的方法也用 synchronized 進(jìn)行了同步,它實際上是繼承于 Vector 類。
HashTable 實現(xiàn)了 Map 接口,它和 HashMap 很相似,但是 HashTable 進(jìn)行了同步處理,而 HashMap 沒有。
同步容器的缺陷
同步容器的同步原理就是在方法上用 synchronized 修飾。那么,這些方法每次只允許一個線程調(diào)用執(zhí)行。
性能問題
由于被 synchronized 修飾的方法,每次只允許一個線程執(zhí)行,其他試圖訪問這個方法的線程只能等待。顯然,這種方式比沒有使用 synchronized 的容器性能要差。
安全問題
同步容器真的一定安全嗎?
答案是:未必。同步容器未必真的安全。在做復(fù)合操作時,仍然需要加鎖來保護(hù)。
常見復(fù)合操作如下:
- 迭代:反復(fù)訪問元素,直到遍歷完全部元素;
- 跳轉(zhuǎn):根據(jù)指定順序?qū)ふ耶?dāng)前元素的下一個(下 n 個)元素;
- 條件運(yùn)算:例如若沒有則添加等;
不安全的示例
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">public class Test { static Vector<Integer> vector = new Vector<Integer>(); public static void main(String[] args) throws InterruptedException { while(true) { for (int i=0;i<10;i++) vector.add(i); Thread thread1 = new Thread(){ public void run() { for (int i=0;i<vector.size();i++) vector.remove(i); } ; } ; Thread thread2 = new Thread(){ public void run() { for (int i=0;i<vector.size();i++) vector.get(i); } ; } ; thread1.start(); thread2.start(); while(Thread.activeCount()>10) { } } } } </pre>
執(zhí)行時可能會出現(xiàn)數(shù)組越界錯誤。
Vector 是線程安全的,為什么還會報這個錯?很簡單,對于 Vector,雖然能保證每一個時刻只能有一個線程訪問它,但是不排除這種可能:
當(dāng)某個線程在某個時刻執(zhí)行這句時:
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">for (int i=0;i<vector.size();i++) vector.get(i); </pre>
假若此時 vector 的 size 方法返回的是 10,i 的值為 9
然后另外一個線程執(zhí)行了這句:
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">for (int i=0;i<vector.size();i++) vector.remove(i); </pre>
將下標(biāo)為 9 的元素刪除了。
那么通過 get 方法訪問下標(biāo)為 9 的元素肯定就會出問題了。
安全示例
因此為了保證線程安全,必須在方法調(diào)用端做額外的同步措施,如下面所示:
public class Test { static Vector<Integer> vector = new Vector<Integer>(); public static void main(String[] args) throws InterruptedException { while(true) { for (int i=0;i<10;i++) vector.add(i); Thread thread1 = new Thread(){ public void run() { synchronized (Test.class) { //進(jìn)行額外的同步 for (int i=0;i<vector.size();i++) vector.remove(i); } } ; } ; Thread thread2 = new Thread(){ public void run() { synchronized (Test.class) { for (int i=0;i<vector.size();i++) vector.get(i); } } ; } ; thread1.start(); thread2.start(); while(Thread.activeCount()>10) { } } } }
ConcurrentModificationException 異常
在對 Vector 等容器并發(fā)地進(jìn)行迭代修改時,會報 ConcurrentModificationException 異常,關(guān)于這個異常將會在后續(xù)文章中講述。
但是在并發(fā)容器中不會出現(xiàn)這個問題。
并發(fā)容器
JDK 的 java.util.concurrent 包(即 juc)中提供了幾個非常有用的并發(fā)容器。
- CopyOnWriteArrayList - 線程安全的 ArrayList
- CopyOnWriteArraySet - 線程安全的 Set,它內(nèi)部包含了一個 CopyOnWriteArrayList,因此本質(zhì)上是由 CopyOnWriteArrayList 實現(xiàn)的。
- ConcurrentSkipListSet - 相當(dāng)于線程安全的 TreeSet。它是有序的 Set。它由 ConcurrentSkipListMap 實現(xiàn)。
- ConcurrentHashMap - 線程安全的 HashMap。采用分段鎖實現(xiàn)高效并發(fā)。
- ConcurrentSkipListMap - 線程安全的有序 Map。使用跳表實現(xiàn)高效并發(fā)。
- ConcurrentLinkedQueue - 線程安全的無界隊列。底層采用單鏈表。支持 FIFO。
- ConcurrentLinkedDeque - 線程安全的無界雙端隊列。底層采用雙向鏈表。支持 FIFO 和 FILO。
- ArrayBlockingQueue - 數(shù)組實現(xiàn)的阻塞隊列。
- LinkedBlockingQueue - 鏈表實現(xiàn)的阻塞隊列。
- LinkedBlockingDeque - 雙向鏈表實現(xiàn)的雙端阻塞隊列。
ConcurrentHashMap
要點
作用:ConcurrentHashMap 是線程安全的 HashMap。
原理:JDK6 與 JDK7 中,ConcurrentHashMap 采用了分段鎖機(jī)制。JDK8 中,摒棄了鎖分段機(jī)制,改為利用 CAS 算法。
源碼
JDK7
ConcurrentHashMap 類在 jdk1.7 中的設(shè)計,其基本結(jié)構(gòu)如圖所示:
每一個 segment 都是一個 HashEntry<K,V>[] table, table 中的每一個元素本質(zhì)上都是一個 HashEntry 的單向隊列。比如 table[3]為首節(jié)點,table[3]->next 為節(jié)點 1,之后為節(jié)點 2,依次類推。
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { // 將整個hashmap分成幾個小的map,每個segment都是一個鎖;與hashtable相比,這么設(shè)計的目的是對于put, remove等操作,可以減少并發(fā)沖突,對 // 不屬于同一個片段的節(jié)點可以并發(fā)操作,大大提高了性能 final Segment<K,V>[] segments; // 本質(zhì)上Segment類就是一個小的hashmap,里面table數(shù)組存儲了各個節(jié)點的數(shù)據(jù),繼承了ReentrantLock, 可以作為互拆鎖使用 static final class Segment<K,V> extends ReentrantLock implements Serializable { transient volatile HashEntry<K,V>[] table; transient int count; } // 基本節(jié)點,存儲Key, Value值 static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; } } </pre>
JDK8
jdk8 中主要做了 2 方面的改進(jìn)
- 取消 segments 字段,直接采用 transient volatile HashEntry<K,V>[] table 保存數(shù)據(jù),采用 table 數(shù)組元素作為鎖,從而實現(xiàn)了對每一行數(shù)據(jù)進(jìn)行加鎖,進(jìn)一步減少并發(fā)沖突的概率。
- 將原先 table 數(shù)組+單向鏈表的數(shù)據(jù)結(jié)構(gòu),變更為 table 數(shù)組+單向鏈表+紅黑樹的結(jié)構(gòu)。
對于 hash 表來說,最核心的能力在于將 key hash 之后能均勻的分布在數(shù)組中。如果 hash 之后散列的很均勻,那么 table 數(shù)組中的每個隊列長度主要為 0 或者 1。
但實際情況并非總是如此理想,雖然 ConcurrentHashMap 類默認(rèn)的加載因子為 0.75,但是在數(shù)據(jù)量過大或者運(yùn)氣不佳的情況下,還是會存在一些隊列長度過長的情況,如果還是采用單向列表方式,那么查詢某個節(jié)點的時間復(fù)雜度為 O(n);
因此,對于個數(shù)超過 8(默認(rèn)值)的列表,jdk1.8 中采用了紅黑樹的結(jié)構(gòu),那么查詢的時間復(fù)雜度可以降低到 O(logN),可以改進(jìn)性能。
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">final V putVal(K key, V value, Boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 如果table為空,初始化;否則,根據(jù)hash值計算得到數(shù)組索引i,如果tab[i]為空,直接新建節(jié)點Node即可。注:tab[i]實質(zhì)為鏈表或者紅黑樹的首節(jié)點。 if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 如果tab[i]不為空并且hash值為MOVED,說明該鏈表正在進(jìn)行transfer操作,返回擴(kuò)容完成后的table。 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 針對首個節(jié)點進(jìn)行加鎖操作,而不是segment,進(jìn)一步減少線程沖突 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; // 如果在鏈表中找到值為key的節(jié)點e,直接設(shè)置e.val = value即可。 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 如果沒有找到值為key的節(jié)點,直接新建Node并加入鏈表即可。 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 如果首節(jié)點為TreeBin類型,說明為紅黑樹結(jié)構(gòu),執(zhí)行putTreeVal操作。 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 如果節(jié)點數(shù)>=8,那么轉(zhuǎn)換鏈表結(jié)構(gòu)為紅黑樹結(jié)構(gòu)。 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 計數(shù)增加1,有可能觸發(fā)transfer操作(擴(kuò)容)。 addCount(1L, binCount); return null; } </pre>
示例
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">public class ConcurrentHashMapDemo { public static void main(String[] args) throws InterruptedException { // HashMap 在并發(fā)迭代訪問時會拋出 ConcurrentModificationException 異常 // Map<Integer, Character> map = new HashMap<>(); Map<Integer, Character> map = new ConcurrentHashMap<>(); Thread wthread = new Thread(() -> { System.out.println("寫操作線程開始執(zhí)行"); for (int i = 0; i < 26; i++) { map.put(i, (char) ('a' + i)); } }); Thread rthread = new Thread(() -> { System.out.println("讀操作線程開始執(zhí)行"); for (Integer key : map.keySet()) { System.out.println(key + " - " + map.get(key)); } }); wthread.start(); rthread.start(); Thread.sleep(1000); } }</pre>
CopyOnWriteArrayList
要點
作用:CopyOnWrite 字面意思為寫入時復(fù)制。CopyOnWriteArrayList 是線程安全的 ArrayList。
原理:
- 在 CopyOnWriteAarrayList 中,讀操作不同步,因為它們在內(nèi)部數(shù)組的快照上工作,所以多個迭代器可以同時遍歷而不會相互阻塞(1,2,4)。
- 所有的寫操作都是同步的。他們在備份數(shù)組(3)的副本上工作。寫操作完成后,后備陣列將被替換為復(fù)制的陣列,并釋放鎖定。支持?jǐn)?shù)組變得易變,所以替換數(shù)組的調(diào)用是原子(5)。
- 寫操作后創(chuàng)建的迭代器將能夠看到修改的結(jié)構(gòu)(6,7)。
- 寫時復(fù)制集合返回的迭代器不會拋出 ConcurrentModificationException,因為它們在數(shù)組的快照上工作,并且無論后續(xù)的修改(2,4)如何,都會像迭代器創(chuàng)建時那樣完全返回元素。
源碼
重要屬性
- lock - 執(zhí)行寫時復(fù)制操作,需要使用可重入鎖加鎖
- array - 對象數(shù)組,用于存放元素
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;"> /** The lock protecting all mutators */ final transient ReentrantLock lock = new ReentrantLock(); /** The array, accessed only via getArray/setArray. */ private transient volatile Object[] array; </pre>
重要方法
添加操作
添加的邏輯很簡單,先將原容器 copy 一份,然后在新副本上執(zhí)行寫操作,之后再切換引用。當(dāng)然此過程是要加鎖的。
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">public Boolean add(E e) { //ReentrantLock加鎖,保證線程安全 final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; //拷貝原容器,長度為原容器長度加一 Object[] newElements = Arrays.copyOf(elements, len + 1); //在新副本上執(zhí)行添加操作 newElements[len] = e; //將原容器引用指向新副本 setArray(newElements); return true; } finally { //解鎖 lock.unlock(); } } </pre>
刪除操作
刪除操作同理,將除要刪除元素之外的其他元素拷貝到新副本中,然后切換引用,將原容器引用指向新副本。同屬寫操作,需要加鎖。
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">public E remove(int index) { //加鎖 final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; E oldValue = get(elements, index); int numMoved = len - index - 1; if (numMoved == 0) //如果要刪除的是列表末端數(shù)據(jù),拷貝前l(fā)en-1個數(shù)據(jù)到新副本上,再切換引用 setArray(Arrays.copyOf(elements, len - 1)); else { //否則,將除要刪除元素之外的其他元素拷貝到新副本中,并切換引用 Object[] newElements = new Object[len - 1]; System.arraycopy(elements, 0, newElements, 0, index); System.arraycopy(elements, index + 1, newElements, index, numMoved); setArray(newElements); } return oldValue; } finally { //解鎖 lock.unlock(); } } </pre>
讀操作
CopyOnWriteArrayList 的讀操作是不用加鎖的,性能很高。
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">public E get(int index) { return get(getArray(), index); } private E get(Object[] a, int index) { return (E) a[index]; } </pre>
示例
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">public class CopyOnWriteArrayListDemo { static class ReadTask implements Runnable { List<String> list; ReadTask(List<String> list) { this.list = list; } public void run() { for (String str : list) { System.out.println(str); } } } static class WriteTask implements Runnable { List<String> list; int index; WriteTask(List<String> list, int index) { this.list = list; this.index = index; } public void run() { list.remove(index); list.add(index, "write_" + index); } } public void run() { final int NUM = 10; // ArrayList 在并發(fā)迭代訪問時會拋出 ConcurrentModificationException 異常 // List<String> list = new ArrayList<>(); CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>(); for (int i = 0; i < NUM; i++) { list.add("main_" + i); } ExecutorService executorService = Executors.newFixedThreadPool(NUM); for (int i = 0; i < NUM; i++) { executorService.execute(new ReadTask(list)); executorService.execute(new WriteTask(list, i)); } executorService.shutdown(); } public static void main(String[] args) { new CopyOnWriteArrayListDemo().run(); } } </pre>
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Springboot使用thymeleaf動態(tài)模板實現(xiàn)刷新
這篇文章主要介紹了Springboot使用thymeleaf動態(tài)模板實現(xiàn)刷新,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-08-08Netty網(wǎng)絡(luò)編程實戰(zhàn)之搭建Netty服務(wù)器
Netty是JBOSS開源的一款NIO網(wǎng)絡(luò)編程框架,可用于快速開發(fā)網(wǎng)絡(luò)的應(yīng)用。Netty是一個異步的、基于事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用框架,用于快速開發(fā)高性能的服務(wù)端和客戶端。本文將詳細(xì)說說如何搭建Netty服務(wù)器,需要的可以參考一下2022-10-10Hibernate雙向一對一映射關(guān)系配置代碼實例
這篇文章主要介紹了Hibernate雙向一對一映射關(guān)系配置代碼實例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-10-10SpringBoot集成WebSocket【基于純H5】進(jìn)行點對點[一對一]和廣播[一對多]實時推送
這篇文章主要介紹了SpringBoot集成WebSocket【基于純H5】進(jìn)行點對點[一對一]和廣播[一對多]實時推送,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-08-08Spring Boot連接超時導(dǎo)致502錯誤的實戰(zhàn)案例
這篇文章主要給大家介紹了關(guān)于Spring Boot連接超時導(dǎo)致502錯誤的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09Springboot程序在使用Maven下載依賴時失效的解決方法
以前能成功創(chuàng)建SpringBoot項目并運(yùn)行,但是現(xiàn)在再創(chuàng)建新項目時Maven下載依賴失敗,且maven依賴刷新按鍵一直沒反應(yīng),所以本文給大家介紹了Springboot程序在使用Maven下載依賴時失效的解決方法,需要的朋友可以參考下2024-05-05Java實現(xiàn)等待所有子線程結(jié)束后再執(zhí)行一段代碼的方法
這篇文章主要介紹了Java實現(xiàn)等待所有子線程結(jié)束后再執(zhí)行一段代碼的方法,涉及java多線程的線程等待與執(zhí)行等相關(guān)操作技巧,需要的朋友可以參考下2017-08-08