深入理解Java 線程通信
當(dāng)線程在系統(tǒng)內(nèi)運(yùn)行時(shí),線程的調(diào)度具有一定的透明性,程序通常無(wú)法準(zhǔn)確控制線程的輪換執(zhí)行,但 Java 也提供了一些機(jī)制來(lái)保證線程協(xié)調(diào)運(yùn)行。
傳統(tǒng)的線程通信
假設(shè)現(xiàn)在系統(tǒng)中有兩個(gè)線程,這兩個(gè)線程分別代表存款者和取錢者——現(xiàn)在假設(shè)系統(tǒng)有一種特殊的要求,系統(tǒng)要求存款者和取錢者不斷地重復(fù)存款、取錢的動(dòng)作,而且要求每當(dāng)存款者將錢存入指定賬戶后,取錢者就立即取出該筆錢。不允許存款者連續(xù)兩次存錢,也不允許取錢者連續(xù)兩次取錢。
為了實(shí)現(xiàn)這種功能,可以借助于 Object 類提供的 wait()、 notify() 和 notifyAll() 三個(gè)方法,這三個(gè)方法并不屬于 Thread 類,而是屬于 Object 類。但這三個(gè)方法必須由同步監(jiān)視器對(duì)象來(lái)調(diào)用,這可分成以下兩種情況。
- 對(duì)于使用 synchronized 修飾的同步方法,因?yàn)樵擃惖哪J(rèn)實(shí)例(this)就是同步監(jiān)視器,所以可以在同步方法中直接調(diào)用這三個(gè)方法。
- 對(duì)于使用 synchronized 修飾的同步代碼塊,同步監(jiān)視器是 synchronized 后括號(hào)里的對(duì)象,所以必須使用該對(duì)象調(diào)用這三個(gè)方法。
關(guān)于這三個(gè)方法的解釋如下。
- wait():導(dǎo)致當(dāng)前線程等待,直到其他線程調(diào)用該同步監(jiān)視器的 notify() 方法或 notifyAll() 方法來(lái)喚醒該線程。該 wait() 方法有三種形式——無(wú)時(shí)間參數(shù)的 wait (—直等待,直到其他線程通知 )、帶毫秒?yún)?shù)的 wait() 和帶毫秒、毫微秒?yún)?shù)的 wait() (這兩種方法都是等待指定時(shí)間后自動(dòng)蘇醒)。調(diào)用 wait() 方法的當(dāng)前線程會(huì)釋放對(duì)該同步監(jiān)視器的鎖定。
- notify():?jiǎn)拘言诖送奖O(jiān)視器上等待的單個(gè)線程。如果所有線程都在此同步監(jiān)視器上等待,則會(huì)選擇喚醒其中一個(gè)線程。選擇是任意性的。只有當(dāng)前線程放棄對(duì)該同步監(jiān)視器的鎖定后(使用 wait() 方法),才可以執(zhí)行被喚醒的線程。
- notifyAll():?jiǎn)拘言诖送奖O(jiān)視器上等待的所有線程。只有當(dāng)前線程放棄對(duì)該同步監(jiān)視器的鎖定后,才可以執(zhí)行被喚醒的線程。
程序中可以通過(guò)一個(gè)旗標(biāo)來(lái)標(biāo)識(shí)賬戶中是否已有存款,當(dāng)旗標(biāo)為 false 時(shí),表明賬戶中沒(méi)有存款,存款者線程可以向下執(zhí)行,當(dāng)存款者把錢存入賬戶后,將旗標(biāo)設(shè)為 true ,并調(diào)用 notify() 或 notifyAll() 方法來(lái)喚醒其他線程;當(dāng)存款者線程進(jìn)入線程體后,如果旗標(biāo)為 true 就調(diào)用 wait() 方法讓該線程等待。
當(dāng)旗標(biāo)為 true 時(shí),表明賬戶中已經(jīng)存入了存款,則取錢者線程可以向下執(zhí)行,當(dāng)取錢者把錢從賬戶中取出后,將旗標(biāo)設(shè)為 false ,并調(diào)用 notify() 或 notifyAll() 方法來(lái)喚醒其他線程;當(dāng)取錢者線程進(jìn)入線程體后,如果旗標(biāo)為 false 就調(diào)用 wait() 方法讓該線程等待。
本程序?yàn)?Account 類提供 draw() 和 deposit() 兩個(gè)方法,分別對(duì)應(yīng)該賬戶的取錢、存款等操作,因?yàn)檫@兩個(gè)方法可能需要并發(fā)修改 Account 類的 balance 成員變量的值,所以這兩個(gè)方法都使用 synchronized 修飾成同步方法。除此之外,這兩個(gè)方法還使用了 wait() 和 notifyAll() 來(lái)控制線程的協(xié)作。
public class Account{ private String accountNo; private double balance; //標(biāo)識(shí)賬戶中是否已有存款的旗標(biāo) private boolean flag = false; public Account(){} public Account(String accountNo , double balance){ this.accountNo = accountNo; this.balance = balance; } public void setAccountNo(String accountNo){ this.accountNo = accountNo; } public String getAccountNo(){ return this.accountNo; } public double getBalance(){ return this.balance; } public synchronized void draw(double drawAmount){ try{ //如果flag為假,表明賬戶中還沒(méi)有人存錢進(jìn)去,則取錢方法阻塞 if (!flag){ wait(); }else{ //執(zhí)行取錢 System.out.println(Thread.currentThread().getName() + " 取錢:" + drawAmount); balance -= drawAmount; System.out.println("賬戶余額為:" + balance); //將標(biāo)識(shí)賬戶是否已有存款的旗標(biāo)設(shè)為false。 flag = false; //喚醒其他線程 notifyAll(); } }catch (InterruptedException ex){ ex.printStackTrace(); } } public synchronized void deposit(double depositAmount){ try{ //如果flag為真,表明賬戶中已有人存錢進(jìn)去,則存錢方法阻塞 if (flag){ // ① wait(); }else{ //執(zhí)行存款 System.out.println(Thread.currentThread().getName() + " 存款:" + depositAmount); balance += depositAmount; System.out.println("賬戶余額為:" + balance); //將表示賬戶是否已有存款的旗標(biāo)設(shè)為true flag = true; //喚醒其他線程 notifyAll(); } }catch (InterruptedException ex){ ex.printStackTrace(); } } public int hashCode(){ return accountNo.hashCode(); } public boolean equals(Object obj){ if (obj != null && obj.getClass() == Account.class){ Account target = (Account)obj; return target.getAccountNo().equals(accountNo); } return false; } }
上面程序中的粗體字代碼使用 wait() 和 notifyAll() 進(jìn)行了控制,對(duì)存款者線程而言,當(dāng)程序進(jìn)入 deposit() 方法后,如果 flag 為 true ,則表明賬戶中已有存款,程序調(diào)用 wait() 方法阻塞;否則程序向下執(zhí)行存款操作,當(dāng)存款操作執(zhí)行完成后,系統(tǒng)將 flag 設(shè)為 true,然后調(diào)用 notifyAll() 來(lái)喚醒其他被阻塞的線程——如果系統(tǒng)中有存款者線程,存款者線程也會(huì)被喚醒,但該存款者線程執(zhí)行到①號(hào)代碼處時(shí)再次進(jìn)入阻塞狀態(tài),只有執(zhí)行 draw() 方法的取錢者線程才可以向下執(zhí)行。同理,取錢者線程的運(yùn)行流程也是如此。
程序中的存款者線程循環(huán)100次重復(fù)存款,而取錢者線程則循環(huán)100次重復(fù)取錢,存款者線程和取錢者線程分別調(diào)用 Account 對(duì)象的 deposit()、 draw() 方法來(lái)實(shí)現(xiàn)。
public class DrawThread extends Thread{ //模擬用戶賬戶 private Account account; //當(dāng)前取錢線程所希望取的錢數(shù) private double drawAmount; public DrawThread(String name, Account account, double drawAmount){ super(name); this.account = account; this.drawAmount = drawAmount; } //重復(fù)100次執(zhí)行取錢操作 public void run(){ for (int i = 0 ; i < 100 ; i++ ){ account.draw(drawAmount); } } }
public class DepositThread extends Thread{ //模擬用戶賬戶 private Account account; //當(dāng)前取錢線程所希望存款的錢數(shù) private double depositAmount; public DepositThread(String name, Account account, double depositAmount){ super(name); this.account = account; this.depositAmount = depositAmount; } //重復(fù)100次執(zhí)行存款操作 public void run(){ for (int i = 0 ; i < 100 ; i++ ){ account.deposit(depositAmount); } } }
主程序可以啟動(dòng)任意多個(gè)存款線程和取錢線程,可以看到所有的取錢線程必須等存款線程存錢后才可以向下執(zhí)行,而存款線程也必須等取錢線程取錢后才可以向下執(zhí)行。主程序代碼如下。
public class TestDraw{ public static void main(String[] args){ //創(chuàng)建一個(gè)賬戶 Account acct = new Account("1234567" , 0); new DrawThread("取錢者" , acct , 800).start(); new DepositThread("存款者甲" , acct , 800).start(); new DepositThread("存款者乙" , acct , 800).start(); new DepositThread("存款者丙" , acct , 800).start(); } }
運(yùn)行該程序,可以看到存款者線程、取錢者線程交替執(zhí)行的情形,每當(dāng)存款者向賬戶中存入800元之后,取錢者線程立即從賬戶中取出這筆錢。存款完成后賬戶余額總是800元,取錢結(jié)束后賬戶余額總是0元。運(yùn)行該程序,會(huì)看到如下圖所示的結(jié)果。
從上圖中可以看出 , 3個(gè)存款者線程隨機(jī)地向賬戶中存款,只有1個(gè)取錢者線程執(zhí)行取錢操作。只有當(dāng)取錢者取錢后,存款者才可以存款;同理,只有等存款者存款后,取錢者線程才可以取錢。
上圖顯示程序最后被阻塞無(wú)法繼續(xù)向下執(zhí)行,這因?yàn)?個(gè)存款者線程共有300次存款操作,但1個(gè)取錢者線程只有100次取錢操作,所以程序最后被阻塞。
注意:上圖所示的阻塞并不是死鎖,對(duì)于這種情況,取錢者線程已經(jīng)執(zhí)行結(jié)束,而存款者線程只是在等待其他線程來(lái)取錢而已,并不是等待其他線程釋放同步監(jiān)視器。不要把死鎖和程序阻塞等同起來(lái)!
使用 Condition 控制線程通信
如果程序不使用 synchronized 關(guān)鍵字來(lái)保證同步,而是直接使用 Lock 對(duì)象來(lái)保證同步,則系統(tǒng)中不存在隱式的同步監(jiān)視器,也就不能使用 wait()、notify()、notifyAll() 方法進(jìn)行線程通信了。
當(dāng)使用 Lock 對(duì)象來(lái)保證同步時(shí),Java 提供了一個(gè) Condition 類來(lái)保持協(xié)調(diào),使用 Condition 可以讓那些已經(jīng)得到 Lock 對(duì)象卻無(wú)法繼續(xù)執(zhí)行的線程釋放 Lock 對(duì)象,Condition 對(duì)象也可以喚醒其他處于等待的線程。
Condition 將同步監(jiān)視器方法(wait()、notify() 和 notifyAll() )分解成截然不同的對(duì)象,以便通過(guò)將這些對(duì)象與 Lock 對(duì)象組合使用,為每個(gè)對(duì)象提供多個(gè)等待集(wait-set)。在這種情況下,Lock 替代了同步方法或同步代碼塊,Condition 替代了同步監(jiān)視器的功能。
Condition 實(shí)例被綁定在一個(gè) Lock 對(duì)象上。要獲得特定 Lock 實(shí)例的 Condition 實(shí)例,調(diào)用 Lock 對(duì)象的 newCondition() 方法即可。Condition 類提供了如下三個(gè)方法。
- await():類似于隱式同步監(jiān)視器上的 wait() 方法,導(dǎo)致當(dāng)前線程等待,直到其他線程調(diào)用該 Condition 的 signal() 方法或 signalAll() 方法來(lái)喚醒該線程。該 await() 方法有更多變體,如 long awaitNanos(long nanosTimeout)、 void awaitUninterruptibly() 、 awaitUntil(Date deadline) 等,可以完成更豐富的等待操作。
- signal():?jiǎn)拘言诖?Lock 對(duì)象上等待的單個(gè)線程。如果所有線程都在該 Lock 對(duì)象上等待,則會(huì)選擇喚醒其中一個(gè)線程。選擇是任意性的。只有當(dāng)前線程放棄對(duì)該 Lock 對(duì)象的鎖定后(使用 await() 方法),才可以執(zhí)行被喚醒的線程。
- signalAll():?jiǎn)拘言诖?Lock 對(duì)象上等待的所有線程。只有當(dāng)前線程放棄對(duì)該 Lock 對(duì)象的鎖定后,才可以執(zhí)行被喚醒的線程。
下面程序中 Account 使用 Lock 對(duì)象來(lái)控制同步,并使用 Condition 對(duì)象來(lái)控制線程的協(xié)調(diào)運(yùn)行。
public class Account{ //顯示定義Lock對(duì)象 private final Lock lock = new ReentrantLock(); //獲得指定Lock對(duì)象對(duì)應(yīng)的條件變量 private final Condition cond = lock.newCondition(); private String accountNo; private double balance; //標(biāo)識(shí)賬戶中是否已經(jīng)存款的旗標(biāo) private boolean flag = false; public Account(){} public Account(String accountNo , double balance){ this.accountNo = accountNo; this.balance = balance; } public void setAccountNo(String accountNo){ this.accountNo = accountNo; } public String getAccountNo(){ return this.accountNo; } public double getBalance(){ return this.balance; } public void draw(double drawAmount){ //加鎖 lock.lock(); try{ //如果賬戶中還沒(méi)有存入存款,該線程等待 if (!flag){ cond.await(); }else{ //執(zhí)行取錢操作 System.out.println(Thread.currentThread().getName() + " 取錢:" + drawAmount); balance -= drawAmount; System.out.println("賬戶余額為:" + balance); //將標(biāo)識(shí)是否成功存入存款的旗標(biāo)設(shè)為false flag = false; //喚醒該Lock對(duì)象對(duì)應(yīng)的其他線程 cond.signalAll(); } }catch (InterruptedException ex){ ex.printStackTrace(); } //使用finally塊來(lái)確保釋放鎖 finally{ lock.unlock(); } } public void deposit(double depositAmount){ lock.lock(); try{ //如果賬戶中已經(jīng)存入了存款,該線程等待 if(flag){ cond.await(); }else{ //執(zhí)行存款操作 System.out.println(Thread.currentThread().getName() + " 存款:" + depositAmount); balance += depositAmount; System.out.println("賬戶余額為:" + balance); //將標(biāo)識(shí)是否成功存入存款的旗標(biāo)設(shè)為true flag = true; //喚醒該Lock對(duì)象對(duì)應(yīng)的其他線程 cond.signalAll(); } }catch (InterruptedException ex){ ex.printStackTrace(); } //使用finally塊來(lái)確保釋放鎖 finally{ lock.unlock(); } } public int hashCode(){ return accountNo.hashCode(); } public boolean equals(Object obj){ if (obj != null && obj.getClass() == Account.class){ Account target = (Account)obj; return target.getAccountNo().equals(accountNo); } return false; } }
顯式地使用 Lock 對(duì)象來(lái)充當(dāng)同步監(jiān)視器,則需要使用 Condition 對(duì)象來(lái)暫停、喚醒指定線程。存取錢的代碼和最上面相同。
使用阻塞隊(duì)列(BlockingQueue)控制線程通信
Java5 提供了一個(gè) BlockingQueue 接口,雖然 BlockingQueue 也是 Queue 的子接口,但它的主要用途并不是作為容器,而是作為線程同步的工具。 BlockingQueue 具有一個(gè)特征:當(dāng)生產(chǎn)者線程試圖向 BlockingQueue 中放入元素時(shí),如果該隊(duì)列已滿,則該線程被阻塞;當(dāng)消費(fèi)者線程試圖從 BlockingQueue 中取出元素時(shí),如果該隊(duì)列已空,則該線程被阻塞。
程序的兩個(gè)線程通過(guò)交替向 BlockingQueue 中放入元素、取出元素,即可很好地控制線程的通信。BlockingQueue 提供如下兩個(gè)支持阻塞的方法。
- put(E e):嘗試把 E 元素放入 BlockingQueue 中,如果該隊(duì)列的元素己滿,則阻塞該線程。
- take():嘗試從 BlockingQueue 的頭部取出元素,如果該隊(duì)列的元素已空,則阻塞該線程。
BlockingQueue 繼承了 Queue 接口,當(dāng)然也可使用 Queue 接口中的方法。這些方法歸納起來(lái)可分為如下三組。
- 在隊(duì)列尾部插入元素。包括 add(E e)、offer(E e) 和 put(E e) 方法,當(dāng)該隊(duì)列已滿時(shí),這三個(gè)方法分別會(huì)拋出異常、返回 false 、阻塞隊(duì)列。
- 在隊(duì)列頭部刪除并返回刪除的元素。包括 remove()、 poll() 和 take() 方法。當(dāng)該隊(duì)列已空時(shí),這三個(gè)方法分別會(huì)拋出異常、返回 false 、阻塞隊(duì)列。
- 在隊(duì)列頭部取出但不刪除元素。包括 element() 和 peek() 方法,當(dāng)隊(duì)列已空時(shí),這兩個(gè)方法分別拋出異常、返回 false
BlockingQueue 包含的方法之間的對(duì)應(yīng)關(guān)系如下表所示:
BlockingQueue 與其實(shí)現(xiàn)類之間的類圖如下圖所示。
上圖中以黑色方框框出的都是 Java7 新增的阻塞隊(duì)列??梢钥吹?, BlockingQueue 包含如下5個(gè)實(shí)現(xiàn)類。
- ArrayBlockingQueue:基于數(shù)組實(shí)現(xiàn)的 BlockingQueue 隊(duì)列。
- LinkedBlockingQueue:基于鏈表實(shí)現(xiàn)的 BlockingQueue 隊(duì)列。
- PriorityBlockingQueue:它并不是標(biāo)準(zhǔn)的阻塞隊(duì)列。與前面介紹的 PriorityQueue 類似,該隊(duì)列調(diào)用 remove()、poll()、take() 等方法取出元素時(shí),并不是取出隊(duì)列中存在時(shí)間最長(zhǎng)的元素,而是隊(duì)列中最小的元素。 PriorityBlockingQueue 判斷元素的大小即可根據(jù)元素(實(shí)現(xiàn) Comparable 接口)的本身大小來(lái)自然排序,也可使用 Comparator 進(jìn)行定制排序。
- SynchronousQueue:同步隊(duì)列。對(duì)該隊(duì)列的存、取操作必須交替進(jìn)行。
- DelayQueue:它是一個(gè)特殊的 BlockingQueue ,底層基于 PriorityBlockingQueue 實(shí)現(xiàn)。不過(guò),DelayQueue 要求集合元素都實(shí)現(xiàn) Delay 接口(該接口里只有一個(gè) long getDelay() 方法),DelayQueue 根據(jù)集合元素的 getDalay() 方法的返回值進(jìn)行排序。
下面以 ArrayBlockingQueue 為例介紹阻塞隊(duì)列的功能和用法。下面先用一個(gè)最簡(jiǎn)單的程序來(lái)測(cè)試 BlockingQueue 的 put() 方法。
public class BlockingQueueTest { public static void main(String[] args) throws Exception { BlockingQueue<String> bq = new ArrayBlockingQueue<>(2); bq.put("Java"); // 與bq.add("Java")、bq.offer("Java") 相同 bq.put("Java"); // 與bq.add("Java")、bq.offer("Java") 相同 bq.put("Java"); // ① 阻塞線程 } }
上面程序先定義一個(gè)大小為2的 BlockingQueue,程序先向該隊(duì)列中放入兩個(gè)元素,此時(shí)隊(duì)列還沒(méi)有滿,兩個(gè)元紊都可以放入,因此使用 put()、add() 和 offer() 方法效果完全一樣。當(dāng)程序試圖放入第三個(gè)元素時(shí),如果使用 put() 方法嘗試放入元素將會(huì)阻寒線程,如上面程序①號(hào)代碼所示。如果使用 add() 方法嘗試放入元素將會(huì)引發(fā)異常;如果使用 offer() 方法嘗試放入元素則會(huì)返回 false,元素不會(huì)被放入。
與此類似的是,在 BlockingQueue 已空的情況下,程序使用 take() 方法嘗試取出元素將會(huì)阻塞線程:使用 remove() 方法嘗試取出元素將引發(fā)異常:使用 poll() 方法嘗試取出元素將返回 false,元索不會(huì)被刪除。
掌握了 BlodcingQuene 阻塞隊(duì)列的特性之后,下面程序就可以利用 BlockingQueue 來(lái)實(shí)現(xiàn)線程通信了。
public class Producer extends Thread { private BlockingQueue<String> bq; public Producer(BlockingQueue<String> bq) { this.bq = bq; } public void run() { String[] strArr = new String[] { "Java", "Struts", "Spring" }; for(int i=0;i<99999999;i++) { System.out.println(getName()+"生產(chǎn)者準(zhǔn)備生產(chǎn)集合元素"); try { Thread.sleep(200); // 嘗試放入元素,如果隊(duì)列已滿,則線程被阻塞 bq.put(strArr[i%3]); }catch(Exception ex) { ex.printStackTrace(); } System.out.println(getName()+"生產(chǎn)完成:"+bq); } } } public class Consumer extends Thread { private BlockingQueue<String> bq; public Consumer(BlockingQueue<String> bq) { this.bq = bq; } public void run() { while(true) { System.out.println(getName()+"消費(fèi)者準(zhǔn)備消費(fèi)集合元素!"); try { Thread.sleep(200); // 嘗試取出元素,如果隊(duì)列已空,則線程被阻塞 bq.take(); }catch(Exception ex) { ex.printStackTrace(); } System.out.println(getName()+"消費(fèi)完成:"+bq); } } } public class BlockingQueueTest2 { public static void main(String[] args) { // 創(chuàng)建一個(gè)容量為1的BlockingQueue BlockingQueue<String> bq = new ArrayBlockingQueue<>(1); // 啟動(dòng)3個(gè)生產(chǎn)者線程 new Producer(bq).start(); new Producer(bq).start(); new Producer(bq).start(); // 啟動(dòng)一個(gè)消費(fèi)者線程 new Consumer(bq).start(); } }
上面程序啟動(dòng)了 3個(gè)生產(chǎn)者線程向 BlockingQueue 集合放入元素,啟動(dòng)了 1個(gè)消費(fèi)者線程從 BlockingQueue 集合取出元素。本程序的 BlockingQueue 集合容量為1,因此3個(gè)生產(chǎn)者線程無(wú)法連續(xù)放入元素,必須等待消費(fèi)者線程取出一個(gè)元素后 , 3個(gè)生產(chǎn)者線程的其中之一才能放入一個(gè)元素。運(yùn)行該程序,會(huì)看到如下圖所示的結(jié)果。
從上圖可以看出,3個(gè)生產(chǎn)者線程都想向 BlockingQueue 中放入元素,但只要其中一個(gè)線程向該隊(duì)列中放入元素之后,其他生產(chǎn)者線程就必須等待,等待消費(fèi)者線程取出 BlockingQueue 隊(duì)列里的元素。
以上就是深入理解Java 線程通信的詳細(xì)內(nèi)容,更多關(guān)于Java 線程通信的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
MyBatis 延遲加載、一級(jí)緩存、二級(jí)緩存(詳解)
下面小編就為大家?guī)?lái)一篇MyBatis 延遲加載、一級(jí)緩存、二級(jí)緩存(詳解)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-08-08Spring AOP之@Around,@AfterReturning使用、切不進(jìn)去的解決方案
這篇文章主要介紹了Spring AOP之@Around,@AfterReturning使用、切不進(jìn)去的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05java中int初始化可以為0,但不能為NULL問(wèn)題
這篇文章主要介紹了java中int初始化可以為0,但不能為NULL問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-02-02從Android源碼剖析Intent查詢匹配的實(shí)現(xiàn)
這篇文章主要介紹了從Android源碼剖析Intent查詢匹配的實(shí)現(xiàn),Intent部分的源碼為Java代碼,需要的朋友可以參考下2015-07-07