Java線程間通訊的幾種方法小結
一、使用同一個共享變量控制
1、Synchronized、wait、notify
public class Demo1 { private final List<Integer> list =new ArrayList<>(); public static void main(String[] args) { Demo1 demo =new Demo1(); new Thread(()->{ for (int i=0;i<10;i++){ synchronized (demo.list){ if(demo.list.size()%2==1){ try { demo.list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } demo.list.add(i); System.out.print(Thread.currentThread().getName()); System.out.println(demo.list); demo.list.notify(); } } }).start(); new Thread(()->{ for (int i=0;i<10;i++){ synchronized (demo.list){ if(demo.list.size()%2==0){ try { demo.list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } demo.list.add(i); System.out.print(Thread.currentThread().getName()); System.out.println(demo.list); demo.list.notify(); } } }).start(); } }
這段代碼演示了如何使用synchronized
、wait
和notify
來實現(xiàn)兩個線程間的通信,以確保它們交替地向一個ArrayList
中添加數(shù)字。以下是代碼的詳細解釋:
類定義:
Demo1
類中有一個私有的、不可變的(由final
修飾)成員變量list
,它是ArrayList<Integer>
類型的。
主函數(shù):
- 在
main
方法中,首先創(chuàng)建了Demo1
的一個實例demo
。 - 然后啟動了兩個線程,每個線程都執(zhí)行一個特定的任務。
- 在
第一個線程的任務:
- 使用一個for循環(huán),循環(huán)10次。
- 在每次循環(huán)中,首先獲得
demo.list
的鎖,這是通過synchronized (demo.list)
實現(xiàn)的。 - 檢查當前列表的大小是否為奇數(shù)(通過
demo.list.size()%2==1
)。如果是,則調用demo.list.wait()
使當前線程進入等待狀態(tài),并釋放demo.list
的鎖,這樣其他線程可以獲取該鎖并執(zhí)行其任務。 - 當線程從等待狀態(tài)被喚醒時(通過另一個線程的
notify
調用),它會繼續(xù)執(zhí)行,并將當前的數(shù)字添加到列表中。 - 打印當前線程的名稱和更新后的列表。
- 通過調用
demo.list.notify()
喚醒可能正在等待的另一個線程。
第二個線程的任務:
- 它的工作方式與第一個線程非常相似,但有一個關鍵的區(qū)別:它檢查列表的大小是否為偶數(shù),并在這種情況下使線程進入等待狀態(tài)。
交替執(zhí)行:
- 由于兩個線程的工作方式,它們將交替地向列表中添加數(shù)字。當一個線程發(fā)現(xiàn)列表的大小是其期望的(奇數(shù)或偶數(shù))時,它會暫停并等待另一個線程添加一個數(shù)字。然后,它會被另一個線程的
notify
調用喚醒,繼續(xù)其執(zhí)行,并再次使另一個線程等待。
- 由于兩個線程的工作方式,它們將交替地向列表中添加數(shù)字。當一個線程發(fā)現(xiàn)列表的大小是其期望的(奇數(shù)或偶數(shù))時,它會暫停并等待另一個線程添加一個數(shù)字。然后,它會被另一個線程的
注意事項:
- 使用
wait
和notify
時,必須在同步塊或方法中這樣做,否則會拋出IllegalMonitorStateException
。 - 當多個線程可能訪問共享資源(在這里是
demo.list
)時,使用同步是必要的,以確保數(shù)據(jù)的完整性和一致性。 - 雖然在這個特定的例子中只有兩個線程,但這種方法可以擴展到更多的線程,只要它們遵循相同的通信和同步協(xié)議。
- 使用
潛在問題:
- 這個代碼可能存在一個潛在的問題,即“假喚醒”。理論上,一個線程可能會無故地(或由于系統(tǒng)中的其他原因)從
wait
方法中喚醒,即使沒有其他線程明確地調用了notify
或notifyAll
。為了避免這種情況導致的問題,通常在wait
調用周圍使用一個循環(huán)來檢查預期的條件是否仍然成立。如果條件不滿足,則繼續(xù)等待。這通常被稱為“條件變量”的使用模式。
- 這個代碼可能存在一個潛在的問題,即“假喚醒”。理論上,一個線程可能會無故地(或由于系統(tǒng)中的其他原因)從
2、Lock、Condition
import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Task { // 創(chuàng)建一個可重入鎖,用于同步訪問共享資源(即列表) private final Lock lock = new ReentrantLock(); // 創(chuàng)建兩個條件變量,一個用于表示列表未滿,一個用于表示列表非空 private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); // 定義列表的最大容量 private static final int MAX_SIZE = 10; // 創(chuàng)建一個ArrayList作為共享資源,用于在兩個線程之間傳遞數(shù)據(jù) private final List<String> list = new ArrayList<>(MAX_SIZE); // add方法用于向列表中添加元素 public void add() { for (int i = 0; i < 10; i++) { lock.lock(); // 獲取鎖,開始同步代碼塊 try { // 如果列表已滿,則當前線程等待,直到其他線程從列表中移除元素 while (list.size() == MAX_SIZE) { notFull.await(); // 等待列表不滿的條件成立 } // 模擬耗時操作(比如網(wǎng)絡請求或數(shù)據(jù)處理) Thread.sleep(100); // 向列表中添加一個新元素,并打印相關信息 list.add("add " + (i + 1)); System.out.println("The list size is " + list.size()); System.out.println("The add thread is " + Thread.currentThread().getName()); System.out.println("-------------"); // 通知可能在等待的移除線程,現(xiàn)在列表不為空,可以執(zhí)行移除操作了 notEmpty.signal(); } catch (InterruptedException e) { // 打印異常信息,實際開發(fā)中可能需要更復雜的錯誤處理邏輯 e.printStackTrace(); } finally { lock.unlock(); // 釋放鎖,允許其他線程訪問同步代碼塊 } } } // sub方法用于從列表中移除元素 public void sub() { for (int i = 0; i < 10; i++) { lock.lock(); // 獲取鎖,開始同步代碼塊 try { // 如果列表為空,則當前線程等待,直到其他線程向列表中添加元素 while (list.isEmpty()) { notEmpty.await(); // 等待列表非空的條件成立 } // 模擬耗時操作(比如網(wǎng)絡請求或數(shù)據(jù)處理) Thread.sleep(100); // 從列表中移除第一個元素,并打印相關信息 list.remove(0); System.out.println("The list size is " + list.size()); System.out.println("The sub thread is " + Thread.currentThread().getName()); System.out.println("-------------"); // 通知可能在等待的添加線程,現(xiàn)在列表不滿,可以執(zhí)行添加操作了 notFull.signal(); } catch (InterruptedException e) { // 打印異常信息,實際開發(fā)中可能需要更復雜的錯誤處理邏輯 e.printStackTrace(); } finally { lock.unlock(); // 釋放鎖,允許其他線程訪問同步代碼塊 } } } // main方法作為程序的入口點,創(chuàng)建Task對象并啟動兩個線程來執(zhí)行add和sub方法 public static void main(String[] args) { Task task = new Task(); // 創(chuàng)建Task對象,它包含共享資源和同步機制 // 使用Lambda表達式和方法引用啟動兩個線程,分別執(zhí)行add和sub方法,并為它們設置名稱以便區(qū)分輸出中的信息來源 new Thread(task::add, "AddThread").start(); // 啟動添加線程 new Thread(task::sub, "SubThread").start(); // 啟動移除線程 } }
這段代碼定義了一個名為Task
的類,它主要實現(xiàn)了線程安全的列表添加和移除操作。類內部使用了java.util.concurrent.locks
包下的ReentrantLock
可重入鎖以及相關的Condition
條件變量來同步訪問共享資源(即一個ArrayList
)。
在Task
類中,有兩個主要的方法:add
和sub
。add
方法用于向列表中添加元素,而sub
方法用于從列表中移除元素。這兩個方法在被調用時都需要獲取鎖,以確保同一時間只有一個線程可以訪問共享資源。
當添加線程調用add
方法時,它首先檢查列表是否已滿。如果已滿,則通過調用notFull.await()
使當前線程等待,直到其他線程從列表中移除元素并發(fā)出通知。一旦列表不滿,添加線程就會向列表中添加一個新元素,并通過調用notEmpty.signal()
通知可能在等待的移除線程。
類似地,當移除線程調用sub
方法時,它首先檢查列表是否為空。如果為空,則通過調用notEmpty.await()
使當前線程等待,直到其他線程向列表中添加元素并發(fā)出通知。一旦列表非空,移除線程就會從列表中移除一個元素,并通過調用notFull.signal()
通知可能在等待的添加線程。
這種使用鎖和條件變量的方式實現(xiàn)了線程間的同步和通信,確保了共享資源(即列表)在任何時候都不會被多個線程同時修改,從而避免了數(shù)據(jù)競爭和不一致的問題。同時,通過條件變量的等待和通知機制,有效地協(xié)調了添加線程和移除線程的執(zhí)行順序,使得它們能夠按照預期的方式交替進行添加和移除操作。
3、利用volatile
volatile修飾的變量值直接存在主內存里面,子線程對該變量的讀寫直接寫住內存,而不是像其它變量一樣在local thread里面產(chǎn)生一份copy。volatile能保證所修飾的變量對于多個線程可見性,即只要被修改,其它線程讀到的一定是最新的值。
public class Demo2 { private volatile List<Integer> list =new ArrayList<>(); public static void main(String[] args) { Demo2 demo =new Demo2(); new Thread(()->{ for (int i=0;i<10;i++){ demo.list.add(i); System.out.print(Thread.currentThread().getName()); System.out.println(demo.list); } }).start(); new Thread(()->{ for (int i=0;i<10;i++){ demo.list.add(i); System.out.print(Thread.currentThread().getName()); System.out.println(demo.list); } }).start(); } }
4、利用AtomicInteger
和volatile類似
二、PipedInputStream、PipedOutputStream
這里用流在兩個線程間通信,但是Java中的Stream是單向的,所以在兩個線程中分別建了一個input和output
public class PipedDemo { private final PipedInputStream inputStream1; private final PipedOutputStream outputStream1; private final PipedInputStream inputStream2; private final PipedOutputStream outputStream2; public PipedDemo(){ inputStream1 = new PipedInputStream(); outputStream1 = new PipedOutputStream(); inputStream2 = new PipedInputStream(); outputStream2 = new PipedOutputStream(); try { inputStream1.connect(outputStream2); inputStream2.connect(outputStream1); } catch (IOException e) { e.printStackTrace(); } } /**程序退出時,需要關閉stream*/ public void shutdown() throws IOException { inputStream1.close(); inputStream2.close(); outputStream1.close(); outputStream2.close(); } public static void main(String[] args) throws IOException { PipedDemo demo =new PipedDemo(); new Thread(()->{ PipedInputStream in = demo.inputStream2; PipedOutputStream out = demo.outputStream2; for (int i = 0; i < 10; i++) { try { byte[] inArr = new byte[2]; in.read(inArr); System.out.print(Thread.currentThread().getName()+": "+i+" "); System.out.println(new String(inArr)); while(true){ if("go".equals(new String(inArr))) break; } out.write("ok".getBytes()); } catch (IOException e) { e.printStackTrace(); } } }).start(); new Thread(()->{ PipedInputStream in = demo.inputStream1; PipedOutputStream out = demo.outputStream1; for (int i = 0; i < 10; i++) { try { out.write("go".getBytes()); byte[] inArr = new byte[2]; in.read(inArr); System.out.print(Thread.currentThread().getName()+": "+i+" "); System.out.println(new String(inArr)); while(true){ if("ok".equals(new String(inArr))) break; } } catch (IOException e) { e.printStackTrace(); } } }).start(); // demo.shutdown(); } }
輸出
Thread-0: 0 go
Thread-1: 0 ok
Thread-0: 1 go
Thread-1: 1 ok
Thread-0: 2 go
Thread-1: 2 ok
Thread-0: 3 go
Thread-1: 3 ok
Thread-0: 4 go
Thread-1: 4 ok
Thread-0: 5 go
Thread-1: 5 ok
Thread-0: 6 go
Thread-1: 6 ok
Thread-0: 7 go
Thread-1: 7 ok
Thread-0: 8 go
Thread-1: 8 ok
Thread-0: 9 go
Thread-1: 9 ok
三、利用BlockingQueue
BlockingQueue定義的常用方法如下:
- add(Object):把Object加到BlockingQueue里,如果BlockingQueue可以容納,則返回true,否則拋出異常。
- offer(Object):表示如果可能的話,將Object加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false。
- put(Object):把Object加到BlockingQueue里,如果BlockingQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue里有空間再繼續(xù)。
- poll(time):獲取并刪除BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數(shù)規(guī)定的時間,取不到時返回null。當不傳入time值時,立刻返回。
- peek():立刻獲取BlockingQueue里排在首位的對象,但不從隊列里刪除,如果隊列為空,則返回null。
- take():獲取并刪除BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態(tài)直到BlockingQueue有新的對象被加入為止。
BlockingQueue有四個具體的實現(xiàn)類:
- ArrayBlockingQueue:數(shù)組阻塞隊列,規(guī)定大小,其構造函數(shù)必須帶一個int參數(shù)來指明其大小。其所含的對象是以FIFO(先入先出)順序排序的。
- LinkedBlockingQueue:鏈阻塞隊列,大小不定,若其構造函數(shù)帶一個規(guī)定大小的參數(shù),生成的BlockingQueue有大小限制,若不帶大小參數(shù),所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定。其所含的對象是以FIFO順序排序的。
- PriorityBlockingQueue:類似于LinkedBlockingQueue,但其所含對象的排序不是FIFO,而是依據(jù)對象的自然排序順序或者是構造函數(shù)所帶的Comparator決定的順序。
- SynchronousQueue:特殊的BlockingQueue,它的內部同時只能夠容納單個元素,對其的操作必須是放和取交替完成的。
- DelayQueue:延遲隊列,注入其中的元素必須實現(xiàn) java.util.concurrent.Delayed 接口
所有BlockingQueue的使用方式類似,以下例子一個線程寫入,一個線程讀取,操作的是同一個Queue:
public class BlockingQueueDemo { public static void main(String[] args) { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); //讀線程 new Thread(() -> { int i =0; while (true) { try { String item = queue.take(); System.out.print(Thread.currentThread().getName() + ": " + i + " "); System.out.println(item); i++; } catch (Exception e) { e.printStackTrace(); } } }).start(); //寫線程 new Thread(() -> { for (int i = 0; i < 10; i++) { try { String item = "go"+i; System.out.print(Thread.currentThread().getName() + ": " + i + " "); System.out.println(item); queue.put(item); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
到此這篇關于Java線程間通訊的幾種方法小結的文章就介紹到這了,更多相關Java線程間通訊內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Jmeter自定義函數(shù)base64加密實現(xiàn)過程解析
這篇文章主要介紹了Jmeter自定義函數(shù)base64加密實現(xiàn)過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-07-07SpringBoot如何整合redis實現(xiàn)過期key監(jiān)聽事件
這篇文章主要介紹了SpringBoot如何整合redis實現(xiàn)過期key監(jiān)聽事件,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-09-09java ExecutorService CompletionService線程池區(qū)別與選擇
這篇文章主要為大家介紹了java ExecutorService CompletionService線程池區(qū)別與選擇使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-09-09使用springmvc的controller層獲取到請求的數(shù)據(jù)方式
這篇文章主要介紹了使用springmvc的controller層獲取到請求的數(shù)據(jù)方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08Java8 中使用Stream 讓List 轉 Map使用問題小結
這篇文章主要介紹了Java8 中使用Stream 讓List 轉 Map使用總結,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-06-06Java NumberFormat格式化float類型的bug
今天小編就為大家分享一篇關于Java NumberFormat格式化float類型的bug,小編覺得內容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-10-10IntelliJ IDEA 2020最新激活碼(親測有效,可激活至 2089 年
這篇文章主要介紹了IntelliJ IDEA 2021最新激活碼(親測有效,可激活至 2089 年),非常不錯,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-04-04