如何使用JCTools實(shí)現(xiàn)Java并發(fā)程序
概述
在本文中,我們將介紹JCTools(Java并發(fā)工具)庫(kù)。
簡(jiǎn)單地說(shuō),這提供了許多適用于多線程環(huán)境的實(shí)用數(shù)據(jù)結(jié)構(gòu)。
非阻塞算法
傳統(tǒng)上,在可變共享狀態(tài)下工作的多線程代碼使用鎖來(lái)確保數(shù)據(jù)一致性和發(fā)布(一個(gè)線程所做的更改對(duì)另一個(gè)線程可見(jiàn))。
這種方法有許多缺點(diǎn):
- 線程在試圖獲取鎖時(shí)可能會(huì)被阻塞,在另一個(gè)線程的操作完成之前不會(huì)取得任何進(jìn)展—這有效地防止了并行性
- 鎖爭(zhēng)用越重,JVM處理調(diào)度線程、管理爭(zhēng)用和等待線程隊(duì)列的時(shí)間就越多,實(shí)際工作就越少
- 如果涉及多個(gè)鎖,并且它們以錯(cuò)誤的順序獲取/釋放,則可能出現(xiàn)死鎖
- 優(yōu)先級(jí)反轉(zhuǎn)的危險(xiǎn)是可能的——高優(yōu)先級(jí)線程被鎖定,試圖獲得由低優(yōu)先級(jí)線程持有的鎖
- 大多數(shù)情況下,使用粗粒度鎖會(huì)嚴(yán)重?fù)p害并行性—細(xì)粒度鎖需要更仔細(xì)的設(shè)計(jì),增加鎖開(kāi)銷,并且更容易出錯(cuò)
另一種方法是使用非阻塞算法,即任何線程的故障或掛起都不會(huì)導(dǎo)致另一個(gè)線程的故障或掛起的算法。
如果所涉及的線程中至少有一個(gè)能夠在任意時(shí)間段內(nèi)取得進(jìn)展,即在處理過(guò)程中不會(huì)出現(xiàn)死鎖,則非阻塞算法是無(wú)鎖的。
此外,如果保證每個(gè)線程的進(jìn)程,這些算法是無(wú)等待的。
下面是一個(gè)非阻塞堆棧示例,它定義了基本狀態(tài):
public class ConcurrentStack<E> { AtomicReference<Node<E>> top = new AtomicReference<Node<E>>(); private static class Node <E> { public E item; public Node<E> next; // standard constructor } }
還有一些API方法:
public void push(E item){ Node<E> newHead = new Node<E>(item); Node<E> oldHead; do { oldHead = top.get(); newHead.next = oldHead; } while(!top.compareAndSet(oldHead, newHead)); } public E pop() { Node<E> oldHead; Node<E> newHead; do { oldHead = top.get(); if (oldHead == null) { return null; } newHead = oldHead.next; } while (!top.compareAndSet(oldHead, newHead)); return oldHead.item; }
我們可以看到,該算法使用細(xì)粒度比較和交換(CAS)指令,并且是無(wú)鎖的(即使多個(gè)線程調(diào)用top.compareAndSet()同時(shí),它們中的一個(gè)保證會(huì)成功)但不能無(wú)等待,因?yàn)椴荒鼙WCCAS最終會(huì)對(duì)任何特定線程成功。
依賴
首先,讓我們將JCTools依賴項(xiàng)添加到pom.xml文件:
<dependency> <groupId>org.jctools</groupId> <artifactId>jctools-core</artifactId> <version>2.1.2</version> </dependency>
請(qǐng)注意,Maven Central上提供了最新的可用版本。
JCTools隊(duì)列
該庫(kù)提供了許多隊(duì)列以在多線程環(huán)境中使用,即一個(gè)或多個(gè)線程以線程安全的無(wú)鎖方式寫入隊(duì)列,一個(gè)或多個(gè)線程以線程安全的無(wú)鎖方式從隊(duì)列中讀取。
所有隊(duì)列實(shí)現(xiàn)的通用接口是org.jctools.queues.MessagePassingQueue。
隊(duì)列類型
所有隊(duì)列都可以根據(jù)其生產(chǎn)者/消費(fèi)者策略進(jìn)行分類:
- 單個(gè)生產(chǎn)者,單個(gè)消費(fèi)者–此類類使用前綴Spsc命名,例如SpscArrayQueue
- 單個(gè)生產(chǎn)者,多個(gè)消費(fèi)者–使用Spmc前綴,例如SpmcArrayQueue
- 多個(gè)生產(chǎn)者,單個(gè)消費(fèi)者-使用Mpsc前綴,例如MpscArrayQueue
- 多個(gè)生產(chǎn)者、多個(gè)消費(fèi)者—使用Mpmc前綴,例如MpmcArrayQueue
需要注意的是,在內(nèi)部沒(méi)有策略檢查,也就是說(shuō),如果使用不正確,隊(duì)列可能會(huì)無(wú)聲地發(fā)生故障。
例如,下面的測(cè)試從兩個(gè)線程填充單個(gè)生產(chǎn)者隊(duì)列并通過(guò),即使不能保證使用者看到來(lái)自不同生產(chǎn)者的數(shù)據(jù):
SpscArrayQueue<Integer> queue = new SpscArrayQueue<>(2); Thread producer1 = new Thread(() -> queue.offer(1)); producer1.start(); producer1.join(); Thread producer2 = new Thread(() -> queue.offer(2)); producer2.start(); producer2.join(); Set<Integer> fromQueue = new HashSet<>(); Thread consumer = new Thread(() -> queue.drain(fromQueue::add)); consumer.start(); consumer.join(); assertThat(fromQueue).containsOnly(1, 2);
隊(duì)列實(shí)現(xiàn)
總結(jié)以上分類,以下是JCTools隊(duì)列列表:
- SpscArrayQueue–單個(gè)生產(chǎn)者,單個(gè)消費(fèi)者,在內(nèi)部使用一個(gè)數(shù)組,限制容量
- SpscLinkedQueue–單個(gè)生產(chǎn)者,單個(gè)消費(fèi)者,內(nèi)部使用鏈表,未綁定容量
- SpscChunkedArrayQueue–單生產(chǎn)商、單消費(fèi)者,從初始容量開(kāi)始,一直增長(zhǎng)到最大容量
- SpscGrowableArrayQueue–單生產(chǎn)者、單消費(fèi)者,從初始容量開(kāi)始,一直增長(zhǎng)到最大容量。這與SpscChunkedArrayQueue是相同的契約,唯一的區(qū)別是內(nèi)部塊管理。建議使用SpscChunkedArrayQueue,因?yàn)樗幸粋€(gè)簡(jiǎn)化的實(shí)現(xiàn)
- SpscUnboundedArrayQueue–單個(gè)生產(chǎn)者,單個(gè)消費(fèi)者,在內(nèi)部使用數(shù)組,未綁定容量
- SpmcArrayQueue–單個(gè)生產(chǎn)者、多個(gè)使用者,在內(nèi)部使用一個(gè)陣列,限制容量
- MpscArrayQueue—多個(gè)生產(chǎn)者、單個(gè)消費(fèi)者在內(nèi)部使用一個(gè)陣列,限制容量
- MpscLinkedQueue–多個(gè)生產(chǎn)者,單個(gè)消費(fèi)者,在內(nèi)部使用鏈表,未綁定容量
- MpmcArrayQueue—多個(gè)生產(chǎn)者、多個(gè)消費(fèi)者在內(nèi)部使用一個(gè)陣列,限制容量
原子隊(duì)列
前面提到的所有隊(duì)列都使用sun.misc.Unsafe. 然而,隨著java9和JEP-260的出現(xiàn),這個(gè)API在默認(rèn)情況下變得不可訪問(wèn)。
因此,有其他隊(duì)列使用java.util.concurrent.atomic.AtomicLongFieldUpdater(公共API,性能較差)而不是sun.misc.Unsafe.
它們是從上面的隊(duì)列生成的,它們的名稱中間插入了單詞Atomic,例如SpscChunkedAtomicArrayQueue或MpmcAtomicArrayQueue。
如果可能,建議使用“常規(guī)”隊(duì)列,并且僅在sun.misc.Unsafe像Hot Java9+和JRockit一樣被禁止/無(wú)效。
容量
所有JCTools隊(duì)列也可能具有最大容量或未綁定。當(dāng)隊(duì)列已滿且受容量限制時(shí),它將停止接受新元素。
在以下示例中,我們:
- 填滿隊(duì)列
- 確保在此之后停止接受新元素
- 從中排出,并確保之后可以添加更多元素
請(qǐng)注意,為了可讀性,刪除了幾個(gè)代碼語(yǔ)句。
SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16); CountDownLatch startConsuming = new CountDownLatch(1); CountDownLatch awakeProducer = new CountDownLatch(1); Thread producer = new Thread(() -> { IntStream.range(0, queue.capacity()).forEach(i -> { assertThat(queue.offer(i)).isTrue(); }); assertThat(queue.offer(queue.capacity())).isFalse(); startConsuming.countDown(); awakeProducer.await(); assertThat(queue.offer(queue.capacity())).isTrue(); }); producer.start(); startConsuming.await(); Set<Integer> fromQueue = new HashSet<>(); queue.drain(fromQueue::add); awakeProducer.countDown(); producer.join(); queue.drain(fromQueue::add); assertThat(fromQueue).containsAll( IntStream.range(0, 17).boxed().collect(toSet()));
其他數(shù)據(jù)結(jié)構(gòu)工具
JCTools還提供了一些非隊(duì)列數(shù)據(jù)結(jié)構(gòu)。
它們都列在下面:
- NonBlockingHashMap–一個(gè)無(wú)鎖的ConcurrentHashMap替代方案,具有更好的伸縮性和通常更低的突變成本。它是實(shí)現(xiàn)sun.misc.Unsafe,因此,不建議在Java9+或JRockit環(huán)境中使用此類
- NonBlockingHashMapLong–與NonBlockingHashMap類似,但使用基本長(zhǎng)鍵
- NonBlockingHashSet–一個(gè)簡(jiǎn)單的包裝器,圍繞著像JDK的java.util.Collections.newSetFromMap()一樣的NonBlockingHashMap
- NonBlockingIdentityHashMap–與NonBlockingHashMap類似,但按標(biāo)識(shí)比較鍵。
- NonBlockingSetInt–一個(gè)多線程位向量集,實(shí)現(xiàn)為一個(gè)原始long數(shù)組。在無(wú)聲自動(dòng)裝箱的情況下工作無(wú)效
性能測(cè)試
讓我們使用JMH來(lái)比較JDK的ArrayBlockingQueue和JCTools隊(duì)列的性能。JMH是Sun/Oracle JVM gurus提供的一個(gè)開(kāi)源微基準(zhǔn)框架,它保護(hù)我們不受編譯器/JVM優(yōu)化算法的不確定性的影響。
請(qǐng)注意,為了提高可讀性,下面的代碼段遺漏了幾個(gè)語(yǔ)句。
public class MpmcBenchmark { @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK}) public volatile String implementation; public volatile Queue<Long> queue; @Benchmark @Group(GROUP_NAME) @GroupThreads(PRODUCER_THREADS_NUMBER) public void write(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && !queue.offer(1L)) { // intentionally left blank } } @Benchmark @Group(GROUP_NAME) @GroupThreads(CONSUMER_THREADS_NUMBER) public void read(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && queue.poll() == null) { // intentionally left blank } } }
結(jié)果:
MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/op MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/op MpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op
我們可以看到,MpmcArrayQueue的性能略好于MpmcAtomicArrayQueue,而ArrayBlockingQueue的速度慢了兩倍。
使用JCTools的缺點(diǎn)
使用JCTools有一個(gè)重要的缺點(diǎn)——不可能強(qiáng)制正確使用庫(kù)類。例如,考慮在我們的大型成熟項(xiàng)目中開(kāi)始使用MpscArrayQueue的情況(注意,必須有一個(gè)使用者)。
不幸的是,由于項(xiàng)目很大,有可能有人出現(xiàn)編程或配置錯(cuò)誤,現(xiàn)在從多個(gè)線程讀取隊(duì)列。這個(gè)系統(tǒng)看起來(lái)像以前一樣工作,但現(xiàn)在有可能消費(fèi)者錯(cuò)過(guò)了一些信息。這是一個(gè)真正的問(wèn)題,可能會(huì)有很大的影響,是很難調(diào)試。
理想情況下,應(yīng)該可以運(yùn)行具有特定系統(tǒng)屬性的系統(tǒng),該屬性強(qiáng)制JCTools確保線程訪問(wèn)策略。例如,本地/測(cè)試/暫存環(huán)境(而不是生產(chǎn)環(huán)境)可能已啟用它。遺憾的是,JCTools沒(méi)有提供這樣的屬性。
另一個(gè)需要考慮的問(wèn)題是,盡管我們確保JCTools比JDK的對(duì)應(yīng)工具快得多,但這并不意味著我們的應(yīng)用程序獲得了與我們開(kāi)始使用自定義隊(duì)列實(shí)現(xiàn)時(shí)相同的速度。大多數(shù)應(yīng)用程序不會(huì)在線程之間交換很多對(duì)象,而且大多是I/O綁定的。
結(jié)論
現(xiàn)在,我們對(duì)JCTools提供的實(shí)用程序類有了基本的了解,并了解了它們?cè)谥剌d下與JDK的對(duì)應(yīng)類相比的性能。
總之,只有當(dāng)我們?cè)诰€程之間交換大量對(duì)象時(shí),才有必要使用該庫(kù),即使這樣,也有必要非常小心地保留線程訪問(wèn)策略。
以上示例的完整源代碼地址:https://github.com/eugenp/tutorials/tree/master/libraries-5
JCTools git地址:https://github.com/JCTools/JCTools
以上就是如何使用JCTools實(shí)現(xiàn)Java并發(fā)程序的詳細(xì)內(nèi)容,更多關(guān)于使用JCTools實(shí)現(xiàn)Java并發(fā)程序的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Java 實(shí)現(xiàn)并發(fā)的幾種方式小結(jié)
- Java并發(fā)編程之ConcurrentLinkedQueue源碼詳解
- JAVA并發(fā)中VOLATILE關(guān)鍵字的神奇之處詳解
- Java并發(fā)編程之CountDownLatch源碼解析
- Java并發(fā)編程之線程之間的共享和協(xié)作
- Java并發(fā)編程之Exchanger方法詳解
- java關(guān)于并發(fā)模型中的兩種鎖知識(shí)點(diǎn)詳解
- Java并發(fā)編程之ReadWriteLock讀寫鎖的操作方法
- Java并發(fā)(Runnable+Thread)實(shí)現(xiàn)硬盤文件搜索功能
- Java高并發(fā)BlockingQueue重要的實(shí)現(xiàn)類詳解
- Java基礎(chǔ)之并發(fā)相關(guān)知識(shí)總結(jié)
相關(guān)文章
IntelliJ IDEA : .java文件左下角顯示"J"圖標(biāo)的問(wèn)題
IntelliJ IDEA : .java文件 左下角顯示“J”,并且不能執(zhí)行代碼,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2020-10-10Java guava monitor監(jiān)視器線程的使用詳解
工作中的場(chǎng)景中是否存在類似這樣的場(chǎng)景,需要提交的線程在某個(gè)觸發(fā)條件下執(zhí)行。本文主要就是使用guava中的monitor來(lái)優(yōu)雅的實(shí)現(xiàn)帶監(jiān)視器的線程2021-11-11SpringBoot4.5.2 整合HikariCP 數(shù)據(jù)庫(kù)連接池操作
這篇文章主要介紹了SpringBoot4.5.2 整合HikariCP 數(shù)據(jù)庫(kù)連接池操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09SpringBoot集成Redis使用Cache緩存的實(shí)現(xiàn)方法
SpringBoot通過(guò)配置RedisConfig類和使用Cache注解可以輕松集成Redis實(shí)現(xiàn)緩存,主要包括@EnableCaching開(kāi)啟緩存,自定義key生成器,改變序列化規(guī)則,以及配置RedisCacheManager,本文為使用SpringBoot與Redis處理緩存提供了詳實(shí)的指導(dǎo)和示例,感興趣的朋友一起看看吧2024-10-10基于kafka實(shí)現(xiàn)Spring Cloud Bus消息總線
消息總線是一種通信工具,可以在機(jī)器之間互相傳輸消息、文件等,這篇文章主要介紹了如何利用kafka實(shí)現(xiàn)SpringCloud Bus消息總線,感興趣的可以學(xué)習(xí)一下2022-04-04MybatisPlus?BaseMapper?實(shí)現(xiàn)對(duì)數(shù)據(jù)庫(kù)增刪改查源碼
MybatisPlus?是一款在?Mybatis?基礎(chǔ)上進(jìn)行的增強(qiáng)?orm?框架,可以實(shí)現(xiàn)不寫?sql?就完成數(shù)據(jù)庫(kù)相關(guān)的操作,這篇文章主要介紹了MybatisPlus?BaseMapper?實(shí)現(xiàn)對(duì)數(shù)據(jù)庫(kù)增刪改查源碼解析,需要的朋友可以參考下2023-01-01