Java并發(fā)之Phaser的全面解析詳解
內容概要
Phaser是Java中一個靈活的同步工具,其優(yōu)點在于支持多階段的任務拆分與同步,并且能夠動態(tài)地注冊與注銷參與者,它提供了豐富的等待與推進機制,使得開發(fā)者能夠更細粒度地控制線程的協(xié)調行為,實現復雜的并行任務處理,相比于其他同步工具,Phaser更加靈活且易于擴展,適用于多種并發(fā)場景。
核心概念
在Java中,Phaser
是一個靈活的同步工具類,它允許多個線程在一個或多個屏障(barrier points)上進行協(xié)調,可以把Phaser
想象成一個多線程聚會的組織者,它負責確保所有參與的線程都到達某個階段后再一起進行下一步。
舉一個實際生活中的場景:假設正在開發(fā)一個在線多人游戲,比如,團隊解謎游戲,在這個游戲中,有幾個玩家(線程)需要合作完成一系列任務來通關,每個任務都被劃分為幾個階段,而每個階段都需要所有玩家共同完成某些操作后才能進入下一階段。
這個場景中,Phaser
就可以發(fā)揮它的作用,可以把每個階段看作是一個屏障點,每個玩家線程在完成自己當前階段的任務后會向Phaser
報告,然后等待其他玩家完成,一旦所有玩家都完成了當前階段的任務,Phaser
就會像一個響鈴一樣,通知所有玩家可以進入下一階段了。
比如,在解謎游戲的關卡中,四個玩家需要分別找到四個不同的線索,并將這些線索組合起來才能打開通往下一關的大門,每個玩家在找到線索后,都會告知Phaser
自己已經完成任務,Phaser
會等待所有四個玩家都找到線索后,再通知他們可以將線索組合起來打開大門進入下一關了。
Phaser主要用于解決多個線程分階段共同完成任務的同步問題,它可以確保一組線程在達到某個屏障點(phase)之前都保持同步,即所有線程都完成了某個階段的任務后,才能一起進入下一個階段,這種同步機制尤其適用于需要多個線程協(xié)作完成復雜任務的情況,比如在線多人游戲、分布式系統(tǒng)、并行計算等場景。
Phaser在內部維護了一個狀態(tài)機,用來跟蹤和管理每個線程的執(zhí)行狀態(tài)以及各個階段的完成情況,當一個線程完成任務并達到屏障點時,會調用Phaser的相應方法來通知其他線程,然后等待其他線程一起進入下一階段,在這個過程中,Phaser會管理線程的同步和協(xié)作,確保所有線程都能按照預定的順序完成各自的任務。
此外,Phaser還提供了靈活的注冊和注銷線程的功能,可以動態(tài)地添加或刪除參與同步的線程,它還支持中斷和超時機制,可以在等待其他線程的過程中被中斷或設置超時,增強了對多線程同步的靈活性。
官方文檔:docx.iamqiang.com/jdk11/api/java.base/java/util/concurrent/Phaser.html
代碼案例
下面是一個簡單的Java示例代碼,演示了如何使用Phaser
來同步多個線程,以確保它們分階段完成任務,如下代碼:
import java.util.concurrent.Phaser; public class PhaserExample { public static void main(String[] args) throws InterruptedException { // 創(chuàng)建一個Phaser實例,初始時注冊3個線程(不包括主線程) Phaser phaser = new Phaser(3); // 創(chuàng)建并啟動3個線程 for (int i = 0; i < 3; i++) { int threadNum = i + 1; // 為了在輸出中區(qū)分線程 new Thread(() -> { System.out.println("線程" + threadNum + ":已經準備好,等待其他線程。"); // 線程在此等待,直到所有線程都到達這個屏障點 phaser.arriveAndAwaitAdvance(); System.out.println("線程" + threadNum + ":第一階段任務完成。"); // 模擬第二階段的任務 try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } // 再次到達屏障點,等待其他線程 phaser.arriveAndAwaitAdvance(); System.out.println("線程" + threadNum + ":第二階段任務完成。"); // 模擬第三階段的任務 try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } // 最后一次到達屏障點,所有線程都完成后Phaser將自動進入終止狀態(tài) phaser.arriveAndAwaitAdvance(); System.out.println("線程" + threadNum + ":第三階段任務完成,Phaser任務結束。"); }).start(); } // 等待所有線程完成任務 // 注意:在實際應用中,可能不希望主線程在這里阻塞,而是去做其他工作 // 但為了演示目的,讓主線程等待所有工作線程完成 phaser.awaitAdvance(phaser.getPhase()); System.out.println("所有線程的第一階段任務完成。"); phaser.awaitAdvance(phaser.getPhase() + 1); System.out.println("所有線程的第二階段任務完成。"); phaser.awaitAdvance(phaser.getPhase() + 1); System.out.println("所有線程的第三階段任務完成,整個任務結束。"); } }
在上面代碼中,創(chuàng)建了一個Phaser
實例并初始注冊了3個線程以及主線程,每個線程都執(zhí)行三個階段的任務,每個階段任務之間都通過phaser.arriveAndAwaitAdvance()
方法進行同步,每個線程在完成當前階段的任務后,都會在這個方法上阻塞,直到所有其他線程也完成了它們當前階段的任務,在所有線程都完成最后一個階段的任務后,Phaser
會自動進入終止狀態(tài),此時不會再有線程被阻塞。
上述代碼輸出如下結果:
主線程也已經準備好,等待其他線程。
線程x已經準備好,等待其他線程。
線程y已經準備好,等待其他線程。
線程z已經準備好,等待其他線程。
(這里所有線程和主線程都會等待,直到所有參與者都調用了arriveAndAwaitAdvance)
主線程第一階段任務完成(實際上主線程可能只是監(jiān)控或協(xié)調其他線程)。
線程x第一階段任務完成。
線程y第一階段任務完成。
線程z第一階段任務完成。
(所有線程和主線程繼續(xù)執(zhí)行,直到它們再次調用arriveAndAwaitAdvance)
主線程第二階段任務完成(實際上可能是等待其他線程完成某些任務)。
線程x第二階段任務完成。
線程y第二階段任務完成。
線程z第二階段任務完成。
(所有線程和主線程繼續(xù)執(zhí)行第三階段任務)
主線程第三階段任務完成(實際上可能是進行一些清理工作或者匯總結果)。
線程x第三階段任務完成,Phaser任務結束。
線程y第三階段任務完成,Phaser任務結束。
線程z第三階段任務完成,Phaser任務結束。
核心API
Phaser
它允許一組線程互相等待,直到所有線程都到達某個屏障(barrier)點,Phaser
非常適合用于多階段的任務拆分和同步,以下是Phaser
中一些重要方法的簡要說明:
Phaser(int parties)
: 構造函數,創(chuàng)建一個新的Phaser
實例,并設置注冊的線程數(parties),這個數字表示在繼續(xù)到下一個階段之前,必須到達屏障的線程數。Phaser()
: 構造函數,創(chuàng)建一個新的Phaser
實例,但不設置注冊的線程數,這通常用于層次結構的Phaser
,其中子Phaser
會繼承父Phaser
的注冊線程數。register()
: 增加一個到達屏障所需的線程數,如果調用此方法的線程尚未注冊,它也會將自己注冊為未到達的線程。arrive()
: 表示當前線程已經到達屏障,并減少未到達的線程數,如果這是最后一個到達的線程,并且已經設置了下一個階段的屏障,那么這個方法將返回true
,否則返回false
。arriveAndAwaitAdvance()
: 當前線程到達屏障,并等待其他線程也到達,當所有線程都到達后,屏障會自動推進到下一個階段,然后該方法返回,如果當前Phaser
被終止,這個方法會拋出IllegalStateException
。awaitAdvance(int phase)
: 等待直到屏障推進到給定的階段,如果當前階段大于或等于給定的階段,那么此方法將立即返回。isTerminated()
: 檢查Phaser
是否已經終止,當注冊的線程數減少到零,且沒有新的線程注冊時,Phaser
將被終止。getPhase()
: 獲取當前屏障的階段號,每個屏障都有一個唯一的階段號,初始階段號為0。getRegisteredParties()
: 獲取當前注冊的線程數。getArrivedParties()
: 獲取已經到達當前屏障的線程數。getUnarrivedParties()
: 獲取尚未到達當前屏障的線程數,這實際上是getRegisteredParties()
和getArrivedParties()
之間的差值。forceTermination()
: 強制終止Phaser
,即使還有未到達的線程,這會導致所有等待在arriveAndAwaitAdvance()
或awaitAdvance(int)
方法上的線程拋出IllegalStateException
。onAdvance(int phase, int registeredParties)
: 這是一個受保護的方法,可以在子類中覆蓋,以便在每個屏障階段推進時執(zhí)行自定義操作。bulkRegister(int parties)
: 一次性注冊多個線程,這通常用于靜態(tài)已知的線程數,或者當多個任務由同一個線程代表時。
核心總結
Phaser類的目的是允許在并發(fā)編程中同步多個線程之間的執(zhí)行,它具有如下優(yōu)點,如下:
- 更好的可擴展性:Phaser類相對于其他同步工具類(如CyclicBarrier和CountDownLatch)具有更好的可擴展性,因為它支持更多的參與者(即線程)同時進行同步。
- 自動注銷和清理:當所有參與者都完成執(zhí)行后,Phaser會自動注銷并釋放相關資源,這有助于避免內存泄漏和資源浪費。
- 靈活的執(zhí)行模式:Phaser類提供了多種執(zhí)行模式,如并行、串行和混合模式,這使得在處理并發(fā)任務時更加靈活。
它也有不少缺點,如:1、與其他的同步工具類相比,Phaser類的實現相對復雜,因此在某些場景下可能會引入額外的性能開銷,并且Phaser類具有一定的使用門檻,使用時深入理解并發(fā)編程和Java并發(fā)API,這可能會增加學習成本。
到此這篇關于Java并發(fā)之Phaser的全面解析詳解的文章就介紹到這了,更多相關Java Phaser內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
基于springboot?配置文件context-path的坑
這篇文章主要介紹了基于springboot?配置文件context-path的坑,基于很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01基于MybatisPlus插件TenantLineInnerInterceptor實現多租戶功能
這篇文章主要介紹了基于MybatisPlus插件TenantLineInnerInterceptor實現多租戶功能,需要的朋友可以參考下2021-11-11AsyncHttpClient?RequestFilter請求篩選源碼解讀
這篇文章主要為大家介紹了AsyncHttpClient?RequestFilter請求篩選源碼解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-12-12Java設計模式之策略模式的使用(Strategy?Pattern)
策略模式是一種行為型設計模式,用于定義一系列算法并將每個算法封裝起來,使它們可以互相替換,從而實現代碼的可維護性和靈活性,策略模式包含策略接口、具體策略類和上下文類,并通過將算法的選擇與使用分離,使得算法可以獨立變化2025-03-03