徹底搞懂Java多線程(五)
單例模式與多線程
單例模式就是全局唯一但是所有程序都可以使用的對象
寫單例模式步驟:
1.將構(gòu)造函數(shù)設(shè)置為私有的
2.創(chuàng)建一個靜態(tài)的類變量
3.提供獲取單例的方法
立即加載/餓漢模式
/**
* user:ypc;
* date:2021-06-13;
* time: 21:02;
*/
//餓漢方式實現(xiàn)單例模式
public class Singleton {
//1.將構(gòu)造函數(shù)設(shè)置為私有的,不然外部可以創(chuàng)建
private Singleton(){
}
//2.創(chuàng)建靜態(tài)的類變量(讓第三步的方法進行返回)
private static Singleton singleton = new Singleton();
//給外部接口提供的獲取單例的方法
public static Singleton getInstance(){
return singleton;
}
}
測試餓漢的單例模式
//測試餓漢方式實現(xiàn)的單例模式,創(chuàng)建兩個線程,看是不是得到了一個實列對象,如果為true就說明餓漢的單例模式?jīng)]有問題
static Singleton singleton1 = null;
static Singleton singleton2 = null;
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(() -> {
singleton1 = Singleton.getInstance();
});
Thread thread2 = new Thread(() -> {
singleton2 = Singleton.getInstance();
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(singleton1 == singleton2);
}

延時加載/懶漢模式
不會隨著程序的啟動而啟動,而是等到有人調(diào)用它的時候,它才會初始化
/**
* user:ypc;
* date:2021-06-13;
* time: 21:22;
*/
//懶漢方式實現(xiàn)單例模式
public class Singleton2 {
static class Singleton {
//1.設(shè)置私有的構(gòu)造函數(shù)
private Singleton() {
}
//2.提供一個私有的靜態(tài)變量
private static Singleton singleton = null;
//3.提供給外部調(diào)用,返回一個單例對象給外部
public static Singleton getInstance() {
if (singleton == null) {
singleton = new Singleton();
}
return singleton;
}
}
}
那么這樣寫有什么問題呢?我們來看看多線程情況下的懶漢方式實現(xiàn)單例模式:
/**
* user:ypc;
* date:2021-06-13;
* time: 21:22;
*/
//懶漢方式實現(xiàn)單例模式
public class Singleton2 {
static class Singleton {
//1.設(shè)置私有的構(gòu)造函數(shù)
private Singleton() {
}
//2.提供一個私有的靜態(tài)變量
private static Singleton singleton = null;
//3.提供給外部調(diào)用,返回一個單例對象給外部
public static Singleton getInstance() throws InterruptedException {
if (singleton == null) {
Thread.sleep(100);
singleton = new Singleton();
}
return singleton;
}
}
static Singleton singleton1 = null;
static Singleton singleton2 = null;
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(() -> {
try {
singleton1 = Singleton.getInstance();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread thread2 = new Thread(() -> {
try {
singleton2 = Singleton.getInstance();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(singleton1 == singleton2);
}
}
結(jié)果:

所以發(fā)生了線程不安全的問題
那么要如何更改呢?
加鎖:👇

結(jié)果就是true了:

給方法加鎖可以實現(xiàn)線程安全,但是所鎖的粒度太大。
使用雙重校驗鎖優(yōu)化后:
static class Singleton {
//1.設(shè)置私有的構(gòu)造函數(shù)
private Singleton() {
}
//2.提供一個私有的靜態(tài)變量
private static Singleton singleton = null;
//3.提供給外部調(diào)用,返回一個單例對象給外部
public static Singleton getInstance() {
if (singleton == null) {
synchronized (Singleton.class) {
if (singleton == null) {
singleton = new Singleton();
}
}
}
return singleton;
}
}

那么這樣寫就沒有問題了嗎?
不是的:有可能還會發(fā)生指令重排的問題
當(dāng)有線程在進行第一次初始化的時候,就有可能發(fā)生問題👇
先來看初始化的過程
1.先分配內(nèi)存空間
2.初始化
3.將singleton指向內(nèi)存
有可能指令重排序之后:
線程1執(zhí)行的順序變成了 1 --> 3 --> 2
在線程1執(zhí)行完1、3之后時間片使用完了
線程2再來執(zhí)行,線程2得到了未初始化的singleton,也就是的到了一個空的對象
也就發(fā)生了線程不安全的問題
那么要如何解決指令重排序的問題呢?那就是使用volatile關(guān)鍵字👇:
/**
* user:ypc;
* date:2021-06-13;
* time: 21:22;
*/
//懶漢方式實現(xiàn)單例模式
public class Singleton2 {
static class Singleton {
//1.設(shè)置私有的構(gòu)造函數(shù)
private Singleton() {
}
//2.提供一個私有的靜態(tài)變量
private static volatile Singleton singleton = null;
//3.提供給外部調(diào)用,返回一個單例對象給外部
public static Singleton getInstance() {
if (singleton == null) {
synchronized (Singleton.class) {
if (singleton == null) {
singleton = new Singleton();
}
}
}
return singleton;
}
}
這樣就沒有問題了
餓漢/懶漢對比
餓漢方式: 優(yōu)點:實現(xiàn)簡單,不存在線程安全的問題,因為餓漢的方式是隨著程序的啟動而初始化的,因為類加載是線程安全的,所以它是線程安全的。缺點:隨著程序的啟動而啟動,有可能在整個程序的運行周期都沒有用到,這樣就帶來了不必要的開銷。
阻塞隊列的實現(xiàn)
import java.util.Random;
/**
* user:ypc;
* date:2021-06-14;
* time: 8:57;
*/
public class MyBlockingQueue {
private int[] values;
private int first;
private int last;
private int size;
MyBlockingQueue(int maxSize) {
this.values = new int[maxSize];
this.first = 0;
this.last = 0;
this.size = 0;
}
public void offer(int val) throws InterruptedException {
synchronized (this) {
if (this.size == values.length) {
this.wait();
}
this.values[last++] = val;
size++;
//變?yōu)檠h(huán)隊列
if (this.last == values.length) {
this.last = 0;
}
//喚醒消費者
this.notify();
}
}
public int poll() throws InterruptedException {
int result = 0;
synchronized (this) {
if (size == 0) {
this.wait();
}
result = this.values[first++];
this.size--;
if (first == this.values.length) {
this.first = 0;
}
//喚醒生產(chǎn)者開生產(chǎn)數(shù)據(jù)
this.notify();
}
return result;
}
public static void main(String[] args) {
MyBlockingQueue myBlockingQueue = new MyBlockingQueue(100);
//生產(chǎn)者
Thread thread1 = new Thread(() -> {
while (true) {
try {
int num = new Random().nextInt(100);
myBlockingQueue.offer(num);
System.out.println("生產(chǎn)者生產(chǎn)數(shù)據(jù):" + num);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//消費者
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
int res = myBlockingQueue.poll();
System.out.println("消費者消費數(shù)據(jù):" + res);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread1.start();
thread2.start();
}
}
可以看到生產(chǎn)者每生產(chǎn)一個數(shù)據(jù)都會被取走:

常見的鎖策略
樂觀鎖
它認為程序在一般的情況下不會發(fā)生問題,所以他在使用的時候不會加鎖,只有在數(shù)據(jù)修改的時候才會判斷有沒有鎖競爭,如果沒有就會直接修改數(shù)據(jù),如果有就會返回失敗信息給用戶自行處理。
CAS
樂觀鎖的經(jīng)典實現(xiàn) Compare and Swap
CAS 實現(xiàn)的三個重要的屬性:
(V,A,B)
V:內(nèi)存中的值
A:預(yù)期的舊值
B:新值
V == A? V -> B : 修改失敗
修改失之后:
自旋對比和替換
CAS 的底層實現(xiàn):
CAS在Java中是通過unsafe來實現(xiàn)的,unsafe時本地類和本地方法,它是c/c++實現(xiàn)的原生方法,通過調(diào)用操作系統(tǒng)Atomic::cmpxchg原子指令來實現(xiàn)的
CAS在java中的應(yīng)用
i++、i–問題
可以使用加鎖、ThreadLocal 解決問題
也可以使用atomic.AtomicInteger來解決問題,底層也使用了樂觀鎖。
import java.util.concurrent.atomic.AtomicInteger;
/**
* user:ypc;
* date:2021-06-14;
* time: 10:12;
*/
public class ThreadDemo1 {
private static AtomicInteger count = new AtomicInteger(0);
private static final int MaxSize = 100000;
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < MaxSize; i++) {
count.getAndIncrement();//i++
}
}
});
thread1.start();
Thread thread2 = new Thread(()->{
for (int i = 0; i < MaxSize; i++) {
count.getAndDecrement();//i--
}
});
thread2.start();
thread1.join();
thread2.join();
System.out.println(count);
}
}

CAS 的ABA問題
當(dāng)有多個線程對一個原子類進行操作的時候,某個線程在短時間內(nèi)將原子類的值A(chǔ)修改為B,又馬上將其修改為A,此時其他線程不感知,還是會修改成功。
來看:
import java.util.concurrent.atomic.AtomicInteger;
/**
* user:ypc;
* date:2021-06-14;
* time: 10:43;
*/
public class ThreadDemo2 {
//線程操作資源,原子類ai的初始值為4
static AtomicInteger ai = new AtomicInteger(4);
public static void main(String[] args) {
new Thread(() -> {
//利用CAS將ai的值改成5
boolean b = ai.compareAndSet(4, 5);
System.out.println(Thread.currentThread().getName()+"是否成功將ai的值修改為5:"+b);
//休眠一秒
try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}
//利用CAS將ai的值改回4
b = ai.compareAndSet(5,4);
System.out.println(Thread.currentThread().getName()+"是否成功將ai的值修改為4:"+b);
},"A").start();
new Thread(() -> {
//模擬此線程執(zhí)行較慢的情況
try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}
//利用CAS將ai的值從4改為10
boolean b = ai.compareAndSet(4, 10);
System.out.println(Thread.currentThread().getName()+"是否成功將ai的值修改為10:"+b);
},"B").start();
//等待其他線程完成,為什么是2,因為一個是main線程,一個是后臺的GC線程
while (Thread.activeCount() > 2) {
Thread.yield();
}
System.out.println("ai最終的值為:"+ai.get());
}
}
上面例子模擬的是A、B兩個線程操作一個資源ai,A的執(zhí)行速度比B的快,在B執(zhí)行前,A就已經(jīng)將ai的值改為5之后馬上又把ai的值改回為4,但是B不感知,所以最后B就修改成功了。
那么會造成會有什么問題呢?
假設(shè)A現(xiàn)在有100元,要給B轉(zhuǎn)賬100元,點擊了兩次轉(zhuǎn)賬按鈕,第一次B只會得到100元,A現(xiàn)在剩余0元。第二次A是0元,預(yù)期的舊值是100,不相等,就不會執(zhí)行轉(zhuǎn)賬操作。
如果點擊第二次按鈕之前,A又得到了100元,B不能感知的到,此時A得到了轉(zhuǎn)賬100元,預(yù)期的舊值就是100,又會轉(zhuǎn)給B100元。
那么如何解決這個問題呢?👇
ABA 問題的解決
我們可以給操作加上版本號,每次修改的時候判斷版本號和預(yù)期的舊值,如果不一樣就不會執(zhí)行操作了。
即是預(yù)期的舊值和V值相等,但是版本號不一樣,也不會執(zhí)行操作。
在Java中的實現(xiàn):
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* user:ypc;
* date:2021-06-14;
* time: 11:05;
*/
public class ThreadDemo3 {
static AtomicStampedReference<Integer> ai = new AtomicStampedReference<>(4,0);
public static void main(String[] args) {
new Thread(() -> {
//四個參數(shù)分別是預(yù)估內(nèi)存值,更新值,預(yù)估版本號,初始版本號
//只有當(dāng)預(yù)估內(nèi)存值==實際內(nèi)存值相等并且預(yù)估版本號==實際版本號,才會進行修改
boolean b = ai.compareAndSet(4, 5,0,1);
System.out.println(Thread.currentThread().getName()+"是否成功將ai的值修改為5:"+b);
try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}
b = ai.compareAndSet(5,4,1,2);
System.out.println(Thread.currentThread().getName()+"是否成功將ai的值修改為4:"+b);
},"A").start();
new Thread(() -> {
try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}
boolean b = ai.compareAndSet(4, 10,0,1);
System.out.println(Thread.currentThread().getName()+"是否成功將ai的值修改為10:"+b);
},"B").start();
while (Thread.activeCount() > 2) {
Thread.yield();
}
System.out.println("ai最終的值為:"+ai.getReference());
}
}

注意:里面的舊值對比的是引用。
如果范圍在-128 - 127 里,會使用緩存的值,如果超過了這個范圍,就會重新來new對象
可以將Integer 的高速緩存的值的邊界調(diào)整
悲觀鎖
悲觀鎖認為只要執(zhí)行多線程的任務(wù),就會發(fā)生線程不安全的問題,所以正在進入方法之后會直接加鎖。
直接使用synchronzied關(guān)鍵字給方法加鎖就可以了
獨占鎖、共享鎖、自旋鎖、可重入鎖
獨占鎖:指的是這一把鎖只能被一個線程所擁有
比如:synchronzied、Lock
共享鎖: 指的是一把鎖可以被多個線程同時擁有
ReadWriterLock讀寫鎖就是共享鎖
讀鎖就是共享的,將鎖的粒度更加的細化
import java.util.Date;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* user:ypc;
* date:2021-06-14;
* time: 11:42;
*/
public class ThreadDemo4 {
//創(chuàng)建讀寫鎖
public static void main(String[] args) {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//讀鎖
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
//寫鎖
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 1000,
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(100), new ThreadPoolExecutor.DiscardPolicy());
//任務(wù)一:讀鎖演示
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "進入了讀鎖,時間:" + new Date());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
}
});
//任務(wù)二:讀鎖演示
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "進入了讀鎖,時間:" + new Date());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
}
});
//任務(wù)三:寫鎖
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
writeLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "進入了寫鎖,時間:" + new Date());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}
});
//任務(wù)四:寫鎖
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
writeLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "進入了寫鎖,時間:" + new Date());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}
});
}
}

可重入鎖:
當(dāng)一個線程擁有了鎖之后,可以重復(fù)的進入,就叫可重入鎖。
synchronzied就是典型的可重入鎖的代表
讀鎖的時間在一秒內(nèi),所以兩個線程讀到的鎖是一把鎖,即讀鎖是共享鎖
而寫鎖的時間剛好是一秒,所以寫鎖是獨占鎖。


自旋鎖:相當(dāng)于死循環(huán),一直嘗試獲取鎖
詳解synchronized鎖的優(yōu)化問題
synchroized加鎖的整個過程,都是依賴于Monitor(監(jiān)視器鎖)實現(xiàn)的,監(jiān)視器鎖在虛擬機中又是根據(jù)操作系統(tǒng)的Metux Lock(互斥量)來實現(xiàn)的,這就導(dǎo)致在加鎖的過程中需要頻繁的在操作系統(tǒng)的內(nèi)核態(tài)和和JVM級別的用戶態(tài)進行切換,并且涉及到線程上下文的切換,是比較消耗性能的。所以后來有一位大佬Doug Lea基于java實現(xiàn)了一個AQS的框架,提供了Lock鎖,性能遠遠高于synchroized。這就導(dǎo)致Oracle公司很沒有面子,因此他們在JDK1.6對synchroized做了優(yōu)化,引入了偏向鎖和輕量級鎖。存在一個從無鎖-》偏向鎖–》輕量級鎖–》重量級鎖的升級過程,優(yōu)化后性能就可以和Lock鎖的方式持平了。
對象頭
HotSpot虛擬機中,對象在內(nèi)存中分為三塊區(qū)域:對象頭、實例數(shù)據(jù)和對齊填充。

對象頭包括兩部分:Mark Word 和 類型指針。類型指針是指向該對象所屬類對象的指針,我們不關(guān)注。mark word用于存儲對象的HashCode、GC分代年齡、鎖狀態(tài)等信息。在32位系統(tǒng)上mark word長度為32bit,64位系統(tǒng)上長度為64bit。他不是一個固定的數(shù)據(jù)結(jié)構(gòu),是和對象的狀態(tài)緊密相關(guān),有一個對應(yīng)關(guān)系的,具體如下表所示:

當(dāng)某一線程第一次獲得鎖的時候,虛擬機會把對象頭中的鎖標志位設(shè)置為“01”,把偏向模式設(shè)置為“1”,表示進入偏向鎖模式。同時使用CAS操作將獲取到這個鎖的線程的ID記錄在對象的Mark Word中。如果CAS操作成功,持有偏向鎖的線程每次進入這個鎖的相關(guān)的同步塊的時候。虛擬機都可以不在進行任何的同步操作。
當(dāng)其他線程進入同步塊時,發(fā)現(xiàn)已經(jīng)有偏向的線程了,偏向模式馬上結(jié)束。根據(jù)鎖對象目前是否處于被鎖定的狀態(tài)決定是否撤銷偏向,也就是將偏向模式設(shè)置為“0”,撤銷后標志位恢復(fù)到“01”,也就是未鎖定的狀態(tài)或者輕量級鎖定,標志位為“00”的狀態(tài),后續(xù)的同步操作就按照下面的輕量級鎖那樣去執(zhí)行
1、在線程進入同步塊的時候,如果同步對象狀態(tài)為無鎖狀態(tài)(鎖標志為 01),虛擬機首先將在當(dāng)前線程的棧幀中建立一個名為鎖記錄的空間,用來存儲鎖對象目前的 Mark Word 的拷貝。拷貝成功后,虛擬機將使用 CAS 操作嘗試將對象的 Mark Word 更新為指向 Lock Record 的指針,并將 Lock Record 里的 owner 指針指向鎖對象的 Mark Word。如果更新成功,則執(zhí)行 2,否則執(zhí)行 3。

2、如果這個更新動作成功了,那么這個線程就擁有了該對象的鎖,并且鎖對象的 Mark Word 中的鎖標志位設(shè)置為 “00”,即表示此對象處于輕量級鎖定狀態(tài),這時候虛擬機線程棧與堆中鎖對象的對象頭的狀態(tài)如圖所示。

3、如果這個更新操作失敗了,虛擬機首先會檢查鎖對象的 Mark Word 是否指向當(dāng)前線程的棧幀,如果是就說明當(dāng)前線程已經(jīng)擁有了這個對象的鎖,那就可以直接進入同步塊繼續(xù)執(zhí)行。否則說明多個線程競爭鎖,輕量級鎖就要膨脹為重要量級鎖,鎖標志的狀態(tài)值變?yōu)?“10”,Mark Word 中存儲的就是指向重量級鎖的指針,后面等待鎖的線程也要進入阻塞狀態(tài)。而當(dāng)前線程便嘗試使用自旋來獲取鎖。自旋失敗后膨脹為重量級鎖,被阻塞。
Semaphore
Semaphore的作用:
在java中,使用了synchronized關(guān)鍵字和Lock鎖實現(xiàn)了資源的并發(fā)訪問控制,在同一時間只允許唯一了線程進入臨界區(qū)訪問資源(讀鎖除外),這樣子控制的主要目的是為了解決多個線程并發(fā)同一資源造成的數(shù)據(jù)不一致的問題。也就是做限流的作用
Semaphore實現(xiàn)原理:
Semaphore是用來保護一個或者多個共享資源的訪問,Semaphore內(nèi)部維護了一個計數(shù)器,其值為可以訪問的共享資源的個數(shù)。一個線程要訪問共享資源,先獲得信號量,如果信號量的計數(shù)器值大于1,意味著有共享資源可以訪問,則使其計數(shù)器值減去1,再訪問共享資源。
如果計數(shù)器值為0,線程進入休眠。當(dāng)某個線程使用完共享資源后,釋放信號量,并將信號量內(nèi)部的計數(shù)器加1,之前進入休眠的線程將被喚醒并再次試圖獲得信號量。
就好比一個廁所管理員,站在門口,只有廁所有空位,就開門允許與空側(cè)數(shù)量等量的人進入廁所。多個人進入廁所后,相當(dāng)于N個人來分配使用N個空位。為避免多個人來同時競爭同一個側(cè)衛(wèi),在內(nèi)部仍然使用鎖來控制資源的同步訪問。
Semaphore的使用:
Semaphore使用時需要先構(gòu)建一個參數(shù)來指定共享資源的數(shù)量,Semaphore構(gòu)造完成后即是獲取Semaphore、共享資源使用完畢后釋放Semaphore。
使用Semaphore 來模擬有四輛車同時到達了停車場的門口,但是停車位只有兩個,也就是只能停兩輛車,這就可以使用信號量來實現(xiàn)。👇:
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* user:ypc;
* date:2021-06-14;
* time: 14:00;
*/
public class ThreadDemo6 {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 200,
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(100), new ThreadPoolExecutor.DiscardPolicy());
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "到達了停車場");
try {
Thread.sleep(1000);
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "進入了停車場");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "出了了停車場");
semaphore.release();
}
});
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "到達了停車場");
try {
Thread.sleep(1000);
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "進入了停車場");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "出了了停車場");
semaphore.release();
}
});
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "到達了停車場");
try {
Thread.sleep(1000);
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "進入了停車場");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "出了了停車場");
semaphore.release();
}
});
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "到達了停車場");
try {
Thread.sleep(1000);
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "進入了停車場");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "出了了停車場");
semaphore.release();
}
});
threadPoolExecutor.shutdown();
}
}

CountDownLatch\CyclicBarrier
CountDownLatch
一個可以用來協(xié)調(diào)多個線程之間的同步,或者說起到線程之間的通信作用的工具類。
它能夠使一個線程在等待另外一些線程完成各自工作之后,再繼續(xù)執(zhí)行。使用一個計數(shù)器進行實現(xiàn)。計數(shù)器初始值為線程的數(shù)量。當(dāng)每一個線程完成自己任務(wù)后,計數(shù)器的值就會減一。當(dāng)計數(shù)器的值為0時,表示所有的線程都已經(jīng)完成了任務(wù),然后在CountDownLatch上等待的線程就可以恢復(fù)執(zhí)行任務(wù)。
CountDownLatch的用法
某一線程在開始運行前等待n個線程執(zhí)行完畢。
將CountDownLatch的計數(shù)器初始化為n:new CountDownLatch(n) ,每當(dāng)一個任務(wù)線程執(zhí)行完畢,就將計數(shù)器減1,
countdownlatch.countDown(),當(dāng)計數(shù)器的值變?yōu)?時,在CountDownLatch上 await() 的線程就會被喚醒。一個典型應(yīng)用場景就是啟動一個服務(wù)時,主線程需要等待多個組件加載完畢,之后再繼續(xù)執(zhí)行。
實現(xiàn)多個線程開始執(zhí)行任務(wù)的最大并行性。注意是并行性,不是并發(fā),強調(diào)的是多個線程在某一時刻同時開始執(zhí)行。做法是初始化一個共享的CountDownLatch(1),將其計數(shù)器初始化為1,多個線程在開始執(zhí)行任務(wù)前首先 coundownlatch.await(),當(dāng)主線程調(diào)用 countDown() 時,計數(shù)器變?yōu)?,多個線程同時被喚醒。
CountDownLatch的不足
CountDownLatch是一次性的,計數(shù)器的值只能在構(gòu)造方法中初始化一次,之后沒有任何機制再次對其設(shè)置值,當(dāng)CountDownLatch使用完畢后,它不能再次被使用。

模擬賽跑:當(dāng)三個運動員都到達終點的時候宣布比賽結(jié)束
import java.util.Random;
import java.util.concurrent.*;
/**
* user:ypc;
* date:2021-06-14;
* time: 14:27;
*/
public class ThreadDemo7 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 200,
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(100));
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "開跑");
int num = new Random().nextInt(4);
num += 1;
try {
Thread.sleep(1000*num);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "到達了終點");
countDownLatch.countDown();
}
});
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "開跑");
int num = new Random().nextInt(4);
num += 1;
try {
Thread.sleep(1000*num);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "到達了終點");
countDownLatch.countDown();
}
});
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "開跑");
int num = new Random().nextInt(4);
num += 1;
try {
Thread.sleep(1000*num);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "到達了終點");
countDownLatch.countDown();
}
});
countDownLatch.await();
System.out.println("所有的選手都到達了終點");
threadPoolExecutor.shutdown();
}
}

CyclicBarrier
CyclicBarrier 的字面意思是可循環(huán)(Cyclic)使用的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續(xù)干活。線程進入屏障通過CyclicBarrier的await()方法。
CyclicBarrier默認的構(gòu)造方法是CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量,每個線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達了屏障,然后當(dāng)前線程被阻塞。
import java.util.concurrent.*;
/**
* user:ypc;
* date:2021-06-14;
* time: 15:03;
*/
public class ThreadDemo8 {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
@Override
public void run() {
System.out.println("到達了循環(huán)屏障");
}
});
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 200,
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(100));
for (int i = 0; i < 10; i++) {
int finalI = i;
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(finalI * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "進入了任務(wù)");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "退出了任務(wù)");
}
});
}
threadPoolExecutor.shutdown();
}
}

CyclicBarrier原理
每當(dāng)線程執(zhí)行await,內(nèi)部變量count減1,如果count!= 0,說明有線程還未到屏障處,則在鎖條件變量trip上等待。
當(dāng)count == 0時,說明所有線程都已經(jīng)到屏障處,執(zhí)行條件變量的signalAll方法喚醒等待的線程。
其中 nextGeneration方法可以實現(xiàn)屏障的循環(huán)使用:
重新生成Generation對象
恢復(fù)count值
CyclicBarrier可以循環(huán)的使用。
hashmap/ConcurrentHashMap
hashmap在JDK1.7中頭插死循環(huán)問題
來看👇JDK1.7 hashMap transfer的源碼
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
Entry<K,V> next = e.next;
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];
newTable[i] = e;
e = next;
}
}
}
來看多線程情況下的問題:

這樣就會造成死循環(huán)。
hashmap在JDK1.8中值覆蓋問題
在JDK1.8的時候使用的是尾插法來看👇:
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
if ((p = tab[i = (n - 1) & hash]) == null) // 如果沒有hash碰撞則直接插入元素
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}
在多線程的情況下:

其中第六行代碼是判斷是否出現(xiàn)hash碰撞,假設(shè)兩個線程1、2都在進行put操作,并且hash函數(shù)計算出的插入下標是相同的,當(dāng)線程1執(zhí)行完第六行代碼后由于時間片耗盡導(dǎo)致被掛起,而線程2得到時間片后在該下標處插入了元素,完成了正常的插入,然后線程A獲得時間片,由于之前已經(jīng)進行了hash碰撞的判斷,所有此時不會再進行判斷,而是直接進行插入,這就導(dǎo)致了線程2插入的數(shù)據(jù)被線程1覆蓋了,從而線程不安全。
除此之前,還有就是代碼的第38行處有個++size,我們這樣想,還是線程1、2,這兩個線程同時進行put操作時,假設(shè)當(dāng)前HashMap的zise大小為10,當(dāng)線程1執(zhí)行到第38行代碼時,從主內(nèi)存中獲得size的值為10后準備進行+1操作,但是由于時間片耗盡只好讓出CPU,線程2快樂的拿到CPU還是從主內(nèi)存中拿到size的值10進行+1操作,完成了put操作并將size=11寫回主內(nèi)存,然后線程1再次拿到CPU并繼續(xù)執(zhí)行(此時size的值仍為10),當(dāng)執(zhí)行完put操作后,還是將size=11寫回內(nèi)存,此時,線程1、2都執(zhí)行了一次put操作,但是size的值只增加了1,所有說還是由于數(shù)據(jù)覆蓋又導(dǎo)致了線程不安全。
總結(jié)
這個系列的文章到這里就結(jié)束了,希望可以幫到你,請您多多關(guān)注腳本之家的更多精彩內(nèi)容!
相關(guān)文章
Java之Spring認證使用Profile配置運行環(huán)境講解
這篇文章主要介紹了Java之Spring認證使用Profile配置運行環(huán)境講解,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下2021-07-07
使用Spring Boot創(chuàng)建Web應(yīng)用程序的示例代碼
本篇文章主要介紹了使用Spring Boot創(chuàng)建Web應(yīng)用程序的示例代碼,我們將使用Spring Boot構(gòu)建一個簡單的Web應(yīng)用程序,并為其添加一些有用的服務(wù),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-05-05

