java通過信號(hào)量實(shí)現(xiàn)限流的示例
信號(hào)量(Semaphore)是 Java 多線程并發(fā)中的一種 JDK 內(nèi)置同步器,通過它可以實(shí)現(xiàn)多線程對(duì)公共資源的并發(fā)訪問控制。
信號(hào)量由來
限流器
信號(hào)量的主要應(yīng)用場景是控制最多 N 個(gè)線程同時(shí)地訪問資源,其中計(jì)數(shù)器的最大值即是許可的最大值 N。
現(xiàn)在我們需要開發(fā)一個(gè)限流器,同一時(shí)刻最多有10個(gè)請(qǐng)求可以執(zhí)行。對(duì)于這樣的需求,我們實(shí)現(xiàn)的方案有:
- 使用Atomic類
- 使用Lock
- 使用條件變量
- 使用信號(hào)量
使用Atomic類實(shí)現(xiàn)
public class LimitByAtomic {
?
?private static final AtomicInteger COUNTER = new AtomicInteger(10);
?
?public void f() {
? ?int count = COUNTER.decrementAndGet();
? ?if (count < 0) {
? ? ?COUNTER.incrementAndGet();
? ? ?System.out.println("拒絕執(zhí)行業(yè)務(wù)邏輯");
? ? ?return; // 拒絕執(zhí)行業(yè)務(wù)邏輯
? }
?
? ?try {
? ? ?// 執(zhí)行業(yè)務(wù)邏輯
? ? ?System.out.println("執(zhí)行業(yè)務(wù)邏輯");
? } finally {
? ? ?COUNTER.incrementAndGet();
? }
}
}使用Lock實(shí)現(xiàn)
public class LimitByLock {
?
?private int count = 10;
?
?public void f() {
? ?if (count <= 0) {
? ? ?System.out.println("拒絕執(zhí)行業(yè)務(wù)邏輯");
? ? ?return;
? }
?
? ?synchronized (this) {
? ? ?if (count <= 0) {
? ? ? ?System.out.println("拒絕執(zhí)行業(yè)務(wù)邏輯");
? ? ? ?return;
? ? }
? ? ?count--;
? }
?
? ?try {
? ? ?// 執(zhí)行業(yè)務(wù)邏輯
? ? ?System.out.println("執(zhí)行業(yè)務(wù)邏輯");
? } finally {
? ? ?synchronized (this) {
? ? ? ?count++;
? ? }
? }
}
}使用條件變量實(shí)現(xiàn)
對(duì)于使用Atomic類還是Lock這兩種實(shí)現(xiàn)方式,都有一個(gè)缺點(diǎn),如果10個(gè)線程同時(shí)執(zhí)行,當(dāng)?shù)?1個(gè)線程來執(zhí)行的時(shí)候,會(huì)被拒絕掉,這樣就沒有執(zhí)行業(yè)務(wù)邏輯的機(jī)會(huì),造成請(qǐng)求丟失。
所以我們可以通過線程等待-通知機(jī)制來解決上面的問題。如果10個(gè)線程同時(shí)執(zhí)行,當(dāng)?shù)?1個(gè)線程來執(zhí)行的時(shí)候,先阻塞這第11個(gè)線程,等待前面的10個(gè)線程只要執(zhí)行完一個(gè),就通知第11個(gè)線程來執(zhí)行。
public class LimitByCondition {
?
?private int count = 10;
?
?public void f() throws Exception {
? ?synchronized (this) {
? ? ?while (count <= 0) {
? ? ? ?System.out.println("等待執(zhí)行業(yè)務(wù)邏輯");
? ? ? ?this.wait();
? ? }
? ? ?count--;
? }
?
? ?try {
? ? ?System.out.println("執(zhí)行業(yè)務(wù)邏輯");
? } finally {
? ? ?synchronized (this) {
? ? ? ?count++;
? ? ? ?this.notifyAll();
? ? }
? }
}
}使用Semaphore實(shí)現(xiàn)
除了使用條件變量,java sdk中還可以使用Semaphore來實(shí)現(xiàn)。
public class LimitBySemaphore {
?
?private final Semaphore semaphore = new Semaphore(10);
?
?public void f() throws Exception {
? ?semaphore.acquire();
? ?try {
? ? ?System.out.println("執(zhí)行業(yè)務(wù)邏輯");
? } finally {
? ? ?semaphore.release();
? }
}
}接下來我們就來探討一下Semaphore的實(shí)現(xiàn)原理。
Semaphore實(shí)現(xiàn)原理
信號(hào)量模型
實(shí)際上Semaphore的實(shí)現(xiàn)原理非常簡單,總結(jié)下來就是:一個(gè)計(jì)數(shù)器,一個(gè)等待隊(duì)列,三個(gè)方法。
在信號(hào)量模型里,計(jì)數(shù)器和等待隊(duì)列對(duì)外是透明,所以只能通過信號(hào)量模型提供的三個(gè)方法訪問,init(),down(),up()--這些方法都是原子性的。
init():設(shè)置計(jì)數(shù)器的初始值。
down():計(jì)數(shù)器的值減1;如果此時(shí)計(jì)數(shù)器的值小于0,則當(dāng)前線程將被阻塞,否則當(dāng)前線程可以繼續(xù)執(zhí)行。
up():計(jì)數(shù)器的值加1;如果此時(shí)計(jì)數(shù)器的值小于等于0,則喚醒等待隊(duì)列中的一個(gè)線程,并將其從等待隊(duì)列中移除。
class MySemaphore{
?// 計(jì)數(shù)器
?int count;
?// 等待隊(duì)列
?Queue queue;
?// 初始化操作
?MySemaphore(int c){
? ?this.count=c;
}
?//
?void down(){
? ?this.count--;
? ?if(this.count<0){
? ? ?// 將當(dāng)前線程插入等待隊(duì)列
? ? ?// 阻塞當(dāng)前線程
? }
}
?void up(){
? ?this.count++;
? ?if(this.count<=0) {
? ? ?// 移除等待隊(duì)列中的某個(gè)線程 T
? ? ?// 喚醒線程 T
? }
}
}
?使用方法如下:
static int count;
// 初始化信號(hào)量
static final MySemaphore s
? ?= new MySemaphore(1);
// 用信號(hào)量保證互斥 ? ?
static void addOne() {
?s.down();
?try {
? ?count+=1;
} finally {
? ?s.up();
}
}
?實(shí)際上信號(hào)量模型,down()、up() 這兩個(gè)操作歷史上最早稱為 P 操作和 V 操作,所以信號(hào)量模型也被稱為 PV 原語。
Java Semaphore的實(shí)現(xiàn)
public class Semaphore implements java.io.Serializable {
public void acquire() throws InterruptedException;
?public void acquireUninterruptibly();
?public boolean tryAcquire();
?public boolean tryAcquire(long timeout, TimeUnit unit);
?public void release();
?public void acquire(int permits) throws InterruptedException;
?public void acquireUninterruptibly(int permits) ;
?public boolean tryAcquire(int permits);
?public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
? ?throws InterruptedException;
?public void release(int permits);
}Java Semaphore的實(shí)現(xiàn),acquire()對(duì)應(yīng)信號(hào)量模型里的down()方法,release()對(duì)應(yīng)信號(hào)量模型里的up()方法。
Semaphore類提供的常用方法有以下幾個(gè)。我們可以粗略地將以下方法分為兩組。前五個(gè)為一組,默認(rèn)一次獲取或釋放的許可(permit)個(gè)數(shù)為1。后五個(gè)為一組,可以指定一次獲取或釋放的許可個(gè)數(shù)。對(duì)于每組方法來說,都有4個(gè)不同的獲取許可的方法:可中斷獲取、不可中斷獲取、非阻塞獲取、可超時(shí)獲取,這跟Lock提供的各種加鎖方法非常相似。
Java Semaphore的實(shí)現(xiàn)也是基于AQS來實(shí)現(xiàn)的,跟ReentrantLock一樣,Semaphore中的AQS也有公平鎖與非公平鎖這兩種實(shí)現(xiàn)。
public class Semaphore implements java.io.Serializable {
abstract static class Sync extends AbstractQueuedSynchronizer {
? }
?
? ?// 非公平鎖
? ?static final class NonfairSync extends Sync {
? }
? ?// 公平鎖
? ?static final class FairSync extends Sync {
? }
? ?// 默認(rèn)使用非公平鎖
? ?public Semaphore(int permits) {
? ? ? ?sync = new NonfairSync(permits);
? }
?
? ?public Semaphore(int permits, boolean fair) {
? ? ? ?sync = fair ? new FairSync(permits) : new NonfairSync(permits);
? }
}Semaphore可以看做是一種共享鎖,因此,F(xiàn)airSync類和NofairSync類實(shí)現(xiàn)了AQS的tryAcquireShared()抽象方法,不過,實(shí)現(xiàn)邏輯并不相同。對(duì)于tryReleaseShared()抽象方法,因?yàn)樵贔airSync和NofairSync中的實(shí)現(xiàn)邏輯相同,因此,它被放置于FairSync和NofairSync的公共父類Sync中。
acquire()實(shí)現(xiàn)如下:
// java.util.concurrent.Semaphore#acquire()
public void acquire() throws InterruptedException {
?sync.acquireSharedInterruptibly(1);
}
?
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
? ? ? ? ? ?throws InterruptedException {
?// 先判斷線程有沒有被中斷
?if (Thread.interrupted())
? ?throw new InterruptedException();
?// 嘗試獲取共享鎖,如果獲取許可失敗,返回值<0, 需要進(jìn)入等待隊(duì)列
?if (tryAcquireShared(arg) < 0)
? ?doAcquireSharedInterruptibly(arg); // 排隊(duì)等待隊(duì)列
}
?tryAcquireShared()實(shí)現(xiàn)
// java.util.concurrent.Semaphore.FairSync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
?for (;;) {
? ?if (hasQueuedPredecessors()) // 比非公平鎖多了這一行
? ? ?return -1;
? ?int available = getState();
? ?int remaining = available - acquires;
? ?if (remaining < 0 ||
? ? ? ?compareAndSetState(available, remaining))
? ? ?return remaining;
}
}
?
// java.util.concurrent.Semaphore.Sync#nonfairTryAcquireShared
final int nonfairTryAcquireShared(int acquires) {
?for (;;) {
? ?int available = getState();
? ?int remaining = available - acquires;
? ?if (remaining < 0 ||
? ? ? ?compareAndSetState(available, remaining))
? ? ?return remaining;
}
}以上兩個(gè)tryAcquireShared()函數(shù)的代碼實(shí)現(xiàn)基本相同。許可個(gè)數(shù)存放在AQS的state變量中,兩個(gè)函數(shù)都是通過自旋+CAS的方式來獲取許可。兩個(gè)函數(shù)唯一的區(qū)別在于,對(duì)于公平模式下的Semaphore,當(dāng)線程調(diào)用tryAcquireShared()函數(shù)時(shí),如果等待隊(duì)列中有等待許可的線程,那么,線程將直接去排隊(duì)等待許可,而不是像非公平模式下的Semaphore那樣,線程可以插隊(duì)直接競爭許可。
release()實(shí)現(xiàn)
// java.util.concurrent.Semaphore#release()
public void release() {
?sync.releaseShared(1);
}
?
// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {
?// 嘗試釋放許可
?if (tryReleaseShared(arg)) {
? ?// 喚醒等待隊(duì)列其中一個(gè)線程
? ?doReleaseShared();
? ?return true;
}
?return false;
}
?
// java.util.concurrent.Semaphore.Sync#tryReleaseShared
protected final boolean tryReleaseShared(int releases) {
?// 采用自旋 + CAS來更新state
?for (;;) {
? ?int current = getState();
? ?int next = current + releases;
? ?if (next < current) // overflow
? ? ?throw new Error("Maximum permit count exceeded");
? ?if (compareAndSetState(current, next))
? ? ?return true;
}
}總結(jié)
semaphore其中一個(gè)功能是lock不容易實(shí)現(xiàn)的,那就是:semaphore可以允許多個(gè)線程訪問同一個(gè)臨界區(qū)。
比較常見的需求就是我們工作中遇到各種池化資源,例如連接池,對(duì)象池,線程池等等。其中,最熟悉的可能是數(shù)據(jù)庫連接池,在同一時(shí)刻,一定是允許多個(gè)線程同時(shí)使用連接池的,當(dāng)然,每個(gè)鏈接在被釋放前,是不允許其他線程使用的。
對(duì)象池:一次性創(chuàng)建出N個(gè)對(duì)象,之后所有的線程重復(fù)利用這N個(gè)對(duì)象,對(duì)象在被釋放前,也是不允許其他線程使用的。對(duì)象池,可以用List保存實(shí)例對(duì)象。
class ObjPool<T, R> {
?final List<T> pool;
?// 用信號(hào)量實(shí)現(xiàn)限流器
?final Semaphore sem;
?// 構(gòu)造函數(shù)
?ObjPool(int size, T t){
? ?pool = new Vector<T>(){};
? ?for(int i=0; i<size; i++){
? ? ?pool.add(t);
? }
? ?sem = new Semaphore(size);
}
?// 利用對(duì)象池的對(duì)象,調(diào)用 func 限流
?R exec(Function<T,R> func) {
? ?T t = null;
? ?sem.acquire();
? ?try {
? ? ?t = pool.remove(0);
? ? ?return func.apply(t);
? } finally {
? ? ?pool.add(t);
? ? ?sem.release();
? }
}
}
// 創(chuàng)建對(duì)象池
ObjPool<Long, String> pool =
?new ObjPool<Long, String>(10, 2);
// 通過對(duì)象池獲取 t,之后執(zhí)行 ?
pool.exec(t -> {
? ?System.out.println(t);
? ?return t.toString();
});
?到此這篇關(guān)于java通過信號(hào)量實(shí)現(xiàn)限流的示例的文章就介紹到這了,更多相關(guān)java 限流內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- java實(shí)現(xiàn)單機(jī)限流
- Java限流實(shí)現(xiàn)的幾種方法詳解
- 詳解5種Java中常見限流算法
- Java服務(wù)限流算法的6種實(shí)現(xiàn)
- Java中實(shí)現(xiàn)接口限流的方案詳解
- 關(guān)于Java限流功能的簡單實(shí)現(xiàn)
- 使用Java自定義注解實(shí)現(xiàn)一個(gè)簡單的令牌桶限流器
- Java中常見的4種限流算法詳解
- Java實(shí)現(xiàn)限流接口的示例詳解
- Java面試之限流的實(shí)現(xiàn)方式小結(jié)
- Java代碼實(shí)現(xiàn)四種限流算法詳細(xì)介紹
相關(guān)文章
Java算法之BFS,DFS,動(dòng)態(tài)規(guī)劃和貪心算法的實(shí)現(xiàn)
廣度優(yōu)先搜索(BFS)和深度優(yōu)先搜索(DFS)是圖遍歷算法中最常見的兩種算法,主要用于解決搜索和遍歷問題。動(dòng)態(tài)規(guī)劃和貪心算法則用來解決優(yōu)化問題。本文就來看看這些算法的具體實(shí)現(xiàn)吧2023-04-04
MyBatis-Plus如何最優(yōu)雅最簡潔地完成數(shù)據(jù)庫操作
Mybatis-Plus是一個(gè)?Mybatis?的增強(qiáng)工具,在?Mybatis?的基礎(chǔ)上只做增強(qiáng)不做改變,為簡化開發(fā)、提高效率而生,下面這篇文章主要給大家介紹了關(guān)于MyBatis-Plus如何最優(yōu)雅最簡潔地完成數(shù)據(jù)庫操作的相關(guān)資料,需要的朋友可以參考下2022-03-03
Spring?Boot?根據(jù)配置決定服務(wù)(集群、單機(jī))是否使用某些主件的操作代碼
這篇文章主要介紹了Spring?Boot根據(jù)配置決定服務(wù)(集群、單機(jī))是否使用某些主件,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2025-04-04

