基于redis分布式鎖實(shí)現(xiàn)秒殺功能
最近在項(xiàng)目中遇到了類似“秒殺”的業(yè)務(wù)場景,在本篇博客中,我將用一個(gè)非常簡單的demo,闡述實(shí)現(xiàn)所謂“秒殺”的基本思路。
業(yè)務(wù)場景
所謂秒殺,從業(yè)務(wù)角度看,是短時(shí)間內(nèi)多個(gè)用戶“爭搶”資源,這里的資源在大部分秒殺場景里是商品;將業(yè)務(wù)抽象,技術(shù)角度看,秒殺就是多個(gè)線程對資源進(jìn)行操作,所以實(shí)現(xiàn)秒殺,就必須控制線程對資源的爭搶,既要保證高效并發(fā),也要保證操作的正確。
一些可能的實(shí)現(xiàn)
剛才提到過,實(shí)現(xiàn)秒殺的關(guān)鍵點(diǎn)是控制線程對資源的爭搶,根據(jù)基本的線程知識,可以不加思索的想到下面的一些方法:
1、秒殺在技術(shù)層面的抽象應(yīng)該就是一個(gè)方法,在這個(gè)方法里可能的操作是將商品庫存-1,將商品加入用戶的購物車等等,在不考慮緩存的情況下應(yīng)該是要操作數(shù)據(jù)庫的。那么最簡單直接的實(shí)現(xiàn)就是在這個(gè)方法上加上synchronized關(guān)鍵字,通俗的講就是鎖住整個(gè)方法;
2、鎖住整個(gè)方法這個(gè)策略簡單方便,但是似乎有點(diǎn)粗暴??梢陨晕?yōu)化一下,只鎖住秒殺的代碼塊,比如寫數(shù)據(jù)庫的部分;
3、既然有并發(fā)問題,那我就讓他“不并發(fā)”,將所有的線程用一個(gè)隊(duì)列管理起來,使之變成串行操作,自然不會有并發(fā)問題。
上面所述的方法都是有效的,但是都不好。為什么?第一和第二種方法本質(zhì)上是“加鎖”,但是鎖粒度依然比較高。什么意思?試想一下,如果兩個(gè)線程同時(shí)執(zhí)行秒殺方法,這兩個(gè)線程操作的是不同的商品,從業(yè)務(wù)上講應(yīng)該是可以同時(shí)進(jìn)行的,但是如果采用第一二種方法,這兩個(gè)線程也會去爭搶同一個(gè)鎖,這其實(shí)是不必要的。第三種方法也沒有解決上面說的問題。
那么如何將鎖控制在更細(xì)的粒度上呢?可以考慮為每個(gè)商品設(shè)置一個(gè)互斥鎖,以和商品ID相關(guān)的字符串為唯一標(biāo)識,這樣就可以做到只有爭搶同一件商品的線程互斥,不會導(dǎo)致所有的線程互斥。分布式鎖恰好可以幫助我們解決這個(gè)問題。
何為分布式鎖
分布式鎖是控制分布式系統(tǒng)之間同步訪問共享資源的一種方式。在分布式系統(tǒng)中,常常需要協(xié)調(diào)他們的動作。如果不同的系統(tǒng)或是同一個(gè)系統(tǒng)的不同主機(jī)之間共享了一個(gè)或一組資源,那么訪問這些資源的時(shí)候,往往需要互斥來防止彼此干擾來保證一致性,在這種情況下,便需要使用到分布式鎖。
我們來假設(shè)一個(gè)最簡單的秒殺場景:數(shù)據(jù)庫里有一張表,column分別是商品ID,和商品ID對應(yīng)的庫存量,秒殺成功就將此商品庫存量-1?,F(xiàn)在假設(shè)有1000個(gè)線程來秒殺兩件商品,500個(gè)線程秒殺第一個(gè)商品,500個(gè)線程秒殺第二個(gè)商品。我們來根據(jù)這個(gè)簡單的業(yè)務(wù)場景來解釋一下分布式鎖。
通常具有秒殺場景的業(yè)務(wù)系統(tǒng)都比較復(fù)雜,承載的業(yè)務(wù)量非常巨大,并發(fā)量也很高。這樣的系統(tǒng)往往采用分布式的架構(gòu)來均衡負(fù)載。那么這1000個(gè)并發(fā)就會是從不同的地方過來,商品庫存就是共享的資源,也是這1000個(gè)并發(fā)爭搶的資源,這個(gè)時(shí)候我們需要將并發(fā)互斥管理起來。這就是分布式鎖的應(yīng)用。
而key-value存儲系統(tǒng),如redis,因?yàn)槠湟恍┨匦?,是?shí)現(xiàn)分布式鎖的重要工具。
具體的實(shí)現(xiàn)
先來看看一些redis的基本命令:
SETNX key value
如果key不存在,就設(shè)置key對應(yīng)字符串value。在這種情況下,該命令和SET一樣。當(dāng)key已經(jīng)存在時(shí),就不做任何操作。SETNX是”SET if Not eXists”。
expire KEY seconds
設(shè)置key的過期時(shí)間。如果key已過期,將會被自動刪除。
del KEY
刪除key
由于筆者的實(shí)現(xiàn)只用到這三個(gè)命令,就只介紹這三個(gè)命令,更多的命令以及redis的特性和使用,可以參考redis官網(wǎng)。
需要考慮的問題
1、用什么操作redis?幸虧redis已經(jīng)提供了jedis客戶端用于java應(yīng)用程序,直接調(diào)用jedis API即可。
2、怎么實(shí)現(xiàn)加鎖?“鎖”其實(shí)是一個(gè)抽象的概念,將這個(gè)抽象概念變?yōu)榫唧w的東西,就是一個(gè)存儲在redis里的key-value對,key是于商品ID相關(guān)的字符串來唯一標(biāo)識,value其實(shí)并不重要,因?yàn)橹灰@個(gè)唯一的key-value存在,就表示這個(gè)商品已經(jīng)上鎖。
3、如何釋放鎖?既然key-value對存在就表示上鎖,那么釋放鎖就自然是在redis里刪除key-value對。
4、阻塞還是非阻塞?筆者采用了阻塞式的實(shí)現(xiàn),若線程發(fā)現(xiàn)已經(jīng)上鎖,會在特定時(shí)間內(nèi)輪詢鎖。
5、如何處理異常情況?比如一個(gè)線程把一個(gè)商品上了鎖,但是由于各種原因,沒有完成操作(在上面的業(yè)務(wù)場景里就是沒有將庫存-1寫入數(shù)據(jù)庫),自然沒有釋放鎖,這個(gè)情況筆者加入了鎖超時(shí)機(jī)制,利用redis的expire命令為key設(shè)置超時(shí)時(shí)長,過了超時(shí)時(shí)間redis就會將這個(gè)key自動刪除,即強(qiáng)制釋放鎖(可以認(rèn)為超時(shí)釋放鎖是一個(gè)異步操作,由redis完成,應(yīng)用程序只需要根據(jù)系統(tǒng)特點(diǎn)設(shè)置超時(shí)時(shí)間即可)。
talk is cheap,show me the code
在代碼實(shí)現(xiàn)層面,注解有并發(fā)的方法和參數(shù),通過動態(tài)代理獲取注解的方法和參數(shù),在代理中加鎖,執(zhí)行完被代理的方法后釋放鎖。
幾個(gè)注解定義:
cachelock是方法級的注解,用于注解會產(chǎn)生并發(fā)問題的方法:
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface CacheLock { String lockedPrefix() default "";//redis 鎖key的前綴 long timeOut() default 2000;//輪詢鎖的時(shí)間 int expireTime() default 1000;//key在redis里存在的時(shí)間,1000S }
lockedObject是參數(shù)級的注解,用于注解商品ID等基本類型的參數(shù):
@Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface LockedObject { //不需要值 }
LockedComplexObject也是參數(shù)級的注解,用于注解自定義類型的參數(shù):
@Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface LockedComplexObject { String field() default "";//含有成員變量的復(fù)雜對象中需要加鎖的成員變量,如一個(gè)商品對象的商品ID }
CacheLockInterceptor實(shí)現(xiàn)InvocationHandler接口,在invoke方法中獲取注解的方法和參數(shù),在執(zhí)行注解的方法前加鎖,執(zhí)行被注解的方法后釋放鎖:
public class CacheLockInterceptor implements InvocationHandler{ public static int ERROR_COUNT = 0; private Object proxied; public CacheLockInterceptor(Object proxied) { this.proxied = proxied; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { CacheLock cacheLock = method.getAnnotation(CacheLock.class); //沒有cacheLock注解,pass if(null == cacheLock){ System.out.println("no cacheLock annotation"); return method.invoke(proxied, args); } //獲得方法中參數(shù)的注解 Annotation[][] annotations = method.getParameterAnnotations(); //根據(jù)獲取到的參數(shù)注解和參數(shù)列表獲得加鎖的參數(shù) Object lockedObject = getLockedObject(annotations,args); String objectValue = lockedObject.toString(); //新建一個(gè)鎖 RedisLock lock = new RedisLock(cacheLock.lockedPrefix(), objectValue); //加鎖 boolean result = lock.lock(cacheLock.timeOut(), cacheLock.expireTime()); if(!result){//取鎖失敗 ERROR_COUNT += 1; throw new CacheLockException("get lock fail"); } try{ //加鎖成功,執(zhí)行方法 return method.invoke(proxied, args); }finally{ lock.unlock();//釋放鎖 } } /** * * @param annotations * @param args * @return * @throws CacheLockException */ private Object getLockedObject(Annotation[][] annotations,Object[] args) throws CacheLockException{ if(null == args || args.length == 0){ throw new CacheLockException("方法參數(shù)為空,沒有被鎖定的對象"); } if(null == annotations || annotations.length == 0){ throw new CacheLockException("沒有被注解的參數(shù)"); } //不支持多個(gè)參數(shù)加鎖,只支持第一個(gè)注解為lockedObject或者lockedComplexObject的參數(shù) int index = -1;//標(biāo)記參數(shù)的位置指針 for(int i = 0;i < annotations.length;i++){ for(int j = 0;j < annotations[i].length;j++){ if(annotations[i][j] instanceof LockedComplexObject){//注解為LockedComplexObject index = i; try { return args[i].getClass().getField(((LockedComplexObject)annotations[i][j]).field()); } catch (NoSuchFieldException | SecurityException e) { throw new CacheLockException("注解對象中沒有該屬性" + ((LockedComplexObject)annotations[i][j]).field()); } } if(annotations[i][j] instanceof LockedObject){ index = i; break; } } //找到第一個(gè)后直接break,不支持多參數(shù)加鎖 if(index != -1){ break; } } if(index == -1){ throw new CacheLockException("請指定被鎖定參數(shù)"); } return args[index]; } }
最關(guān)鍵的RedisLock類中的lock方法和unlock方法:
/** * 加鎖 * 使用方式為: * lock(); * try{ * executeMethod(); * }finally{ * unlock(); * } * @param timeout timeout的時(shí)間范圍內(nèi)輪詢鎖 * @param expire 設(shè)置鎖超時(shí)時(shí)間 * @return 成功 or 失敗 */ public boolean lock(long timeout,int expire){ long nanoTime = System.nanoTime(); timeout *= MILLI_NANO_TIME; try { //在timeout的時(shí)間范圍內(nèi)不斷輪詢鎖 while (System.nanoTime() - nanoTime < timeout) { //鎖不存在的話,設(shè)置鎖并設(shè)置鎖過期時(shí)間,即加鎖 if (this.redisClient.setnx(this.key, LOCKED) == 1) { this.redisClient.expire(key, expire);//設(shè)置鎖過期時(shí)間是為了在沒有釋放 //鎖的情況下鎖過期后消失,不會造成永久阻塞 this.lock = true; return this.lock; } System.out.println("出現(xiàn)鎖等待"); //短暫休眠,避免可能的活鎖 Thread.sleep(3, RANDOM.nextInt(30)); } } catch (Exception e) { throw new RuntimeException("locking error",e); } return false; } public void unlock() { try { if(this.lock){ redisClient.delKey(key);//直接刪除 } } catch (Throwable e) { } }
上述的代碼是框架性的代碼,現(xiàn)在來講解如何使用上面的簡單框架來寫一個(gè)秒殺函數(shù)。
先定義一個(gè)接口,接口里定義了一個(gè)秒殺方法:
public interface SeckillInterface { /** *現(xiàn)在暫時(shí)只支持在接口方法上注解 */ //cacheLock注解可能產(chǎn)生并發(fā)的方法 @CacheLock(lockedPrefix="TEST_PREFIX") public void secKill(String userID,@LockedObject Long commidityID);//最簡單的秒殺方法,參數(shù)是用戶ID和商品ID??赡苡卸鄠€(gè)線程爭搶一個(gè)商品,所以商品ID加上LockedObject注解 }
上述SeckillInterface接口的實(shí)現(xiàn)類,即秒殺的具體實(shí)現(xiàn):
public class SecKillImpl implements SeckillInterface{ static Map<Long, Long> inventory ; static{ inventory = new HashMap<>(); inventory.put(10000001L, 10000l); inventory.put(10000002L, 10000l); } @Override public void secKill(String arg1, Long arg2) { //最簡單的秒殺,這里僅作為demo示例 reduceInventory(arg2); } //模擬秒殺操作,姑且認(rèn)為一個(gè)秒殺就是將庫存減一,實(shí)際情景要復(fù)雜的多 public Long reduceInventory(Long commodityId){ inventory.put(commodityId,inventory.get(commodityId) - 1); return inventory.get(commodityId); } }
模擬秒殺場景,1000個(gè)線程來爭搶兩個(gè)商品:
@Test public void testSecKill(){ int threadCount = 1000; int splitPoint = 500; CountDownLatch endCount = new CountDownLatch(threadCount); CountDownLatch beginCount = new CountDownLatch(1); SecKillImpl testClass = new SecKillImpl(); Thread[] threads = new Thread[threadCount]; //起500個(gè)線程,秒殺第一個(gè)商品 for(int i= 0;i < splitPoint;i++){ threads[i] = new Thread(new Runnable() { public void run() { try { //等待在一個(gè)信號量上,掛起 beginCount.await(); //用動態(tài)代理的方式調(diào)用secKill方法 SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(), new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass)); proxy.secKill("test", commidityId1); endCount.countDown(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); threads[i].start(); } //再起500個(gè)線程,秒殺第二件商品 for(int i= splitPoint;i < threadCount;i++){ threads[i] = new Thread(new Runnable() { public void run() { try { //等待在一個(gè)信號量上,掛起 beginCount.await(); //用動態(tài)代理的方式調(diào)用secKill方法 SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(), new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass)); proxy.secKill("test", commidityId2); //testClass.testFunc("test", 10000001L); endCount.countDown(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); threads[i].start(); } long startTime = System.currentTimeMillis(); //主線程釋放開始信號量,并等待結(jié)束信號量,這樣做保證1000個(gè)線程做到完全同時(shí)執(zhí)行,保證測試的正確性 beginCount.countDown(); try { //主線程等待結(jié)束信號量 endCount.await(); //觀察秒殺結(jié)果是否正確 System.out.println(SecKillImpl.inventory.get(commidityId1)); System.out.println(SecKillImpl.inventory.get(commidityId2)); System.out.println("error count" + CacheLockInterceptor.ERROR_COUNT); System.out.println("total cost " + (System.currentTimeMillis() - startTime)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
在正確的預(yù)想下,應(yīng)該每個(gè)商品的庫存都減少了500,在多次試驗(yàn)后,實(shí)際情況符合預(yù)想。如果不采用鎖機(jī)制,會出現(xiàn)庫存減少499,498的情況。
這里采用了動態(tài)代理的方法,利用注解和反射機(jī)制得到分布式鎖ID,進(jìn)行加鎖和釋放鎖操作。當(dāng)然也可以直接在方法進(jìn)行這些操作,采用動態(tài)代理也是為了能夠?qū)㈡i操作代碼集中在代理中,便于維護(hù)。
通常秒殺場景發(fā)生在web項(xiàng)目中,可以考慮利用spring的AOP特性將鎖操作代碼置于切面中,當(dāng)然AOP本質(zhì)上也是動態(tài)代理。
小結(jié)
這篇文章從業(yè)務(wù)場景出發(fā),從抽象到實(shí)現(xiàn)闡述了如何利用redis實(shí)現(xiàn)分布式鎖,完成簡單的秒殺功能,也記錄了筆者思考的過程,希望能給閱讀到本篇文章的人一些啟發(fā)。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Deepin UOS編譯安裝Redis的實(shí)現(xiàn)步驟
本文主要介紹了Deepin UOS編譯安裝Redis的實(shí)現(xiàn)步驟,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01Redis自動化安裝及集群實(shí)現(xiàn)搭建過程
這篇文章主要介紹了Redis自動化安裝以及集群實(shí)現(xiàn)搭建過程,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-09-09Redis集群服務(wù)器的實(shí)現(xiàn)(圖文步驟)
本文介紹了Redis集群服務(wù)器的優(yōu)勢,為讀者提供了全面的Redis集群服務(wù)器知識和使用技巧,具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09利用Redis實(shí)現(xiàn)SQL伸縮的方法簡介
這篇文章主要介紹了利用Redis實(shí)現(xiàn)SQL伸縮的方法,包括講到了鎖和時(shí)間序列等方面來提升傳統(tǒng)數(shù)據(jù)庫的性能,需要的朋友可以參考下2015-06-06攔截Redis命令導(dǎo)致的Lua腳本執(zhí)行失敗的問題解決
本文主要介紹了攔截Redis命令導(dǎo)致的Lua腳本執(zhí)行失敗的問題解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06