Java多線程之同步工具類Exchanger
1 Exchanger 介紹
前面分別介紹了CyclicBarrier、CountDownLatch、Semaphore,現(xiàn)在介紹并發(fā)工具類中的最后一個Exchange。
Exchanger 是一個用于線程間協(xié)作的工具類,Exchanger用于進行線程間的數(shù)據(jù)交換,它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數(shù)據(jù)。這兩個線程通過exchange 方法交換數(shù)據(jù),如果第一個線程先執(zhí)行exchange 方法,它會一直等待第二個線程也執(zhí)行exchange 方法,當兩個線程都到達同步點時,這兩個線程就可以交換數(shù)據(jù)。
A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.
在以上的描述中,有幾個要點:
- 此類提供對外的操作是同步的;
- 用于成對出現(xiàn)的線程之間交換數(shù)據(jù);
- 可以視作雙向的同步隊列;
- 可應(yīng)用于基因算法、流水線設(shè)計等場景。
- 接著看api文檔,這個類提供對外的接口非常簡潔,一個無參構(gòu)造函數(shù),兩個重載的范型exchange方法:
public V exchange(V x) throws InterruptedException public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
2 Exchanger 實例
public class ExchangerTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
final Exchanger exchanger = new Exchanger();
executor.execute(new Runnable() {
String data = "data1";
@Override
public void run() {
doExchangeWork(data, exchanger);
}
});
executor.execute(new Runnable() {
String data = "data2";
@Override
public void run() {
doExchangeWork(data, exchanger);
}
});
executor.shutdown();
}
private static void doExchangeWork(String data, Exchanger exchanger) {
try {
System.out.println(Thread.currentThread().getName() + "正在把數(shù)據(jù) " + data + " 交換出去");
Thread.sleep((long) (Math.random() * 1000));
String exchangeData = (String) exchanger.exchange(data);
System.out.println(Thread.currentThread().getName() + "交換得到數(shù)據(jù) " + exchangeData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
pool-1-thread-1正在把數(shù)據(jù) data1 交換出去
pool-1-thread-2正在把數(shù)據(jù) data2 交換出去
pool-1-thread-2交換得到數(shù)據(jù) data1
pool-1-thread-1交換得到數(shù)據(jù) data2
當線程A調(diào)用Exchange對象的exchange()方法后,他會陷入阻塞狀態(tài),直到線程B也調(diào)用了exchange()方法,然后以線程安全的方式交換數(shù)據(jù),之后線程A和B繼續(xù)運行。
exchange等待超時
public class ExchangerTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
final Exchanger exchanger = new Exchanger();
executor.execute(new Runnable() {
String data = "data1";
@Override
public void run() {
doExchangeWork(data, exchanger);
}
});
executor.execute(new Runnable() {
String data = "data2";
@Override
public void run() {
try {
Thread.sleep((long) (3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
doExchangeWork(data, exchanger);
}
});
executor.shutdown();
}
private static void doExchangeWork(String data, Exchanger exchanger) {
try {
System.out.println(Thread.currentThread().getName() + "正在把數(shù)據(jù) " + data + " 交換出去");
//遠小于3秒拋出異常
String exchangeData = (String) exchanger.exchange(data,1, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + "交換得到數(shù)據(jù) " + exchangeData);
} catch ( TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
pool-1-thread-1正在把數(shù)據(jù) data1 交換出去
java.util.concurrent.TimeoutException
at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
at ExchangerTest.doExchangeWork(ExchangerTest.java:37)
at ExchangerTest.access$000(ExchangerTest.java:3)
at ExchangerTest$1.run(ExchangerTest.java:12)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
pool-1-thread-2正在把數(shù)據(jù) data2 交換出去
java.util.concurrent.TimeoutException
at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
at ExchangerTest.doExchangeWork(ExchangerTest.java:37)
at ExchangerTest.access$000(ExchangerTest.java:3)
at ExchangerTest$2.run(ExchangerTest.java:26)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
實戰(zhàn)場景:
設(shè)計一個定時任務(wù),每日凌晨執(zhí)行。在定時任務(wù)中啟動兩個線程,一個線程負責對業(yè)務(wù)明細表(xxx_info)進行查詢統(tǒng)計,把統(tǒng)計的結(jié)果放置在內(nèi)存緩沖區(qū),另一個線程負責讀取緩沖區(qū)中的統(tǒng)計結(jié)果并插入到業(yè)務(wù)統(tǒng)計表(xxx_statistics)中。
親,這樣的場景是不是聽起來很有感覺?沒錯!兩個線程在內(nèi)存中批量交換數(shù)據(jù),這個事情我們可以使用Exchanger去做!
3 實現(xiàn)原理
Exchanger(交換者)是一個用于線程間協(xié)作的工具類。Exchanger用于進行線程間的數(shù)據(jù)交換。它提供一個同步點,在這個同步點兩個線程可以交換彼此的數(shù)據(jù)。這兩個線程通過exchange方法交換數(shù)據(jù), 如果第一個線程先執(zhí)行exchange方法,它會一直等待第二個線程也執(zhí)行exchange,當兩個線程都到達同步點時,這兩個線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對方。因此使用Exchanger的重點是成對的線程使用exchange()方法,當有一對線程達到了同步點,就會進行交換數(shù)據(jù)。因此該工具類的線程對象是成對的。
Exchanger類提供了兩個方法,String exchange(V x):用于交換,啟動交換并等待另一個線程調(diào)用exchange;String exchange(V x,long timeout,TimeUnit unit):用于交換,啟動交換并等待另一個線程調(diào)用exchange,并且設(shè)置最大等待時間,當?shù)却龝r間超過timeout便停止等待。
到此這篇關(guān)于Java多線程之同步工具類Exchanger的文章就介紹到這了,更多相關(guān)Java多線程 Exchanger內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
IDEA mybatis-generator逆向工程生成代碼
這篇文章主要介紹了IDEA mybatis-generator逆向工程生成代碼,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-06-06
spring boot整合mybatis+mybatis-plus的示例代碼
這篇文章主要介紹了spring boot整合mybatis+mybatis-plus的示例代碼,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-01-01
SpringSecurity跨域請求偽造(CSRF)的防護實現(xiàn)
本文主要介紹了SpringSecurity跨域請求偽造(CSRF)的防護實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07
Java8新特性之Base64詳解_動力節(jié)點Java學(xué)院整理
這篇文章主要為大家詳細介紹了Java8新特性之Base64的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-06-06
Java中的static關(guān)鍵字用法總結(jié)
這篇文章主要介紹了Java中的static關(guān)鍵字用法總結(jié),static是Java50個關(guān)鍵字之一,static關(guān)鍵字可以用來修飾代碼塊表示靜態(tài)代碼塊,修飾成員變量表示全局靜態(tài)成員變量,修飾方法表示靜態(tài)方法,需要的朋友可以參考下2023-11-11
RestTemplate發(fā)送get和post請求,下載文件的實例
這篇文章主要介紹了RestTemplate發(fā)送get和post請求,下載文件的實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09

