Redis實(shí)現(xiàn)分布式鎖和等待序列的方法示例
在集群下,經(jīng)常會(huì)因?yàn)橥瑫r(shí)處理發(fā)生資源爭(zhēng)搶和并發(fā)問題,但是我們都知道同步鎖 synchronized 、 cas 、 ReentrankLock 這些鎖的作用范圍都是 JVM ,說(shuō)白了在集群下沒啥用。這時(shí)我們就需要能在多臺(tái) JVM 之間決定執(zhí)行順序的鎖了,現(xiàn)在分布式鎖主要有 redis 、 Zookeeper 實(shí)現(xiàn)的,還有數(shù)據(jù)庫(kù)的方式,不過(guò)性能太差,也就是需要一個(gè)第三方的監(jiān)管。
背景
最近在做一個(gè)消費(fèi) Kafka 消息的時(shí)候發(fā)現(xiàn),由于線上的消費(fèi)者過(guò)多,經(jīng)常會(huì)遇到,多個(gè)機(jī)器同時(shí)處理一個(gè)主鍵類型的數(shù)據(jù)的情況發(fā)生,如果最后是執(zhí)行更新操作的話,也就是一個(gè)更新順序的問題,但是如果恰好都需要插入數(shù)據(jù)的時(shí)候,會(huì)出現(xiàn)主鍵重復(fù)的問題。這是生產(chǎn)上不被允許的(因?yàn)楣居挟惓1O(jiān)管的機(jī)制,扣分啥的),這是就需要個(gè)分布式鎖了,斟酌后用了 Redis 的實(shí)現(xiàn)方式(因?yàn)榫W(wǎng)上例子多)
分析
redis 實(shí)現(xiàn)的分布式鎖,實(shí)現(xiàn)原理是 set 方法,因?yàn)槎鄠€(gè)線程同時(shí)請(qǐng)求的時(shí)候,只有一個(gè)線程可以成功并返回結(jié)果,還可以設(shè)置有效期,來(lái)避免死鎖的發(fā)生,一切都是這么的完美,不過(guò)有個(gè)問題,在 set 的時(shí)候,會(huì)直接返回結(jié)果,成功或者失敗,不具有阻塞效果,需要我們自己對(duì)失敗的線程進(jìn)程處理,有兩種方式
- 丟棄
- 等待重試 由于我們的系統(tǒng)需要這些數(shù)據(jù),那么只能重新嘗試獲取。這里使用 redis 的 List 類型實(shí)現(xiàn)等待序列的作用
代碼
直接上代碼 其實(shí)直接redis的工具類就可以解決了
package com.test import redis.clients.jedis.Jedis; import java.util.Collections; import java.util.List; /** * @desc redis隊(duì)列實(shí)現(xiàn)方式 * @anthor * @date **/ public class RedisUcUitl { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; private RedisUcUitl() { } /** * logger **/ /** * 存儲(chǔ)redis隊(duì)列順序存儲(chǔ) 在隊(duì)列首部存入 * * @param key 字節(jié)類型 * @param value 字節(jié)類型 */ public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) { return jedis.lpush(key, value); } /** * 移除列表中最后一個(gè)元素 并將改元素添加入另一個(gè)列表中 ,當(dāng)列表為空時(shí) 將阻塞連接 直到等待超時(shí) * * @param srckey * @param dstkey * @param timeout 0 表示永不超時(shí) * @return */ public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) { return jedis.brpoplpush(srckey, dstkey, timeout); } /** * 返回制定的key,起始位置的redis數(shù)據(jù) * @param redisKey * @param start * @param end -1 表示到最后 * @return */ public static List<byte[]> lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) { return jedis.lrange(redisKey, start, end); } /** * 刪除key * @param redisKey */ public static void delete(Jedis jedis, final byte[] redisKey) { return jedis.del(redisKey); } /** * 嘗試加鎖 * @param lockKey key名稱 * @param requestId 身份標(biāo)識(shí) * @param expireTime 過(guò)期時(shí)間 * @return */ public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) { String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); return LOCK_SUCCESS.equals(result); } /** * 釋放鎖 * @param lockKey key名稱 * @param requestId 身份標(biāo)識(shí) * @return */ public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) { final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); return RELEASE_SUCCESS.equals(result); } }
業(yè)務(wù)邏輯主要代碼如下
1.先消耗隊(duì)列中的
while(true){ // 消費(fèi)隊(duì)列 try{ // 被放入redis隊(duì)列的數(shù)據(jù) 序列化后的 byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1); if(bytes == null || bytes.isEmpty()){ // 隊(duì)列中沒數(shù)據(jù)時(shí)退出 break; } // 反序列化對(duì)象 Map<String, Object> singleMap = (Map<String, Object>) ObjectSerialUtil.bytesToObject(bytes); // 塞入唯一的值 防止被其他線程誤解鎖 String requestId = UUID.randomUUID().toString(); boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100); if(lockGetFlag){ // 成功獲取鎖 進(jìn)行業(yè)務(wù)處理 //TODO // 處理完畢釋放鎖 boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId); }else{ // 未能獲得鎖放入等待隊(duì)列 RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param)); } }catch(Exception e){ break; } }
2.處理最新接到的數(shù)據(jù)
同樣是走嘗試獲取鎖,獲取不到放入隊(duì)列的流程
一般序列化用 fastJson 之列的就可以了,這里用的是 JDK 自帶的,工具類如下
public class ObjectSerialUtil { private ObjectSerialUtil() { // 工具類 } /** * 將Object對(duì)象序列化為byte[] * * @param obj 對(duì)象 * @return byte數(shù)組 * @throws Exception */ public static byte[] objectToBytes(Object obj) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); byte[] bytes = bos.toByteArray(); bos.close(); oos.close(); return bytes; } /** * 將bytes數(shù)組還原為對(duì)象 * * @param bytes * @return * @throws Exception */ public static Object bytesToObject(byte[] bytes) { try { ByteArrayInputStream bin = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream(bin); return ois.readObject(); } catch (Exception e) { throw new BaseException("反序列化出錯(cuò)!", e); } } }
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Redis分布式鎖Redlock的實(shí)現(xiàn)
本文主要介紹了Redis分布式鎖Redlock的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08Redis中?HyperLogLog數(shù)據(jù)類型使用小結(jié)
Redis使用HyperLogLog的主要作用是在大數(shù)據(jù)流(view,IP,城市)的情況下進(jìn)行去重計(jì)數(shù),這篇文章主要介紹了Redis中?HyperLogLog數(shù)據(jù)類型使用總結(jié),需要的朋友可以參考下2023-03-03redis key過(guò)期監(jiān)聽的實(shí)現(xiàn)示例
在Redis中,我們可以為Key設(shè)置過(guò)期時(shí)間,當(dāng)Key的過(guò)期時(shí)間到達(dá)后,Redis會(huì)自動(dòng)將該Key標(biāo)記為已失效,本文就來(lái)介紹一下redis key過(guò)期監(jiān)聽的實(shí)現(xiàn)示例,感興趣的可以了解一下2024-03-03RedisDesktopManager遠(yuǎn)程連接redis的實(shí)現(xiàn)
本文主要介紹了RedisDesktopManager遠(yuǎn)程連接redis的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-05-05window下創(chuàng)建redis出現(xiàn)問題小結(jié)
這篇文章主要介紹了window下創(chuàng)建redis出現(xiàn)問題總結(jié),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10詳解Redis中的List是如何實(shí)現(xiàn)的
List 的 Redis 中的 5 種主要數(shù)據(jù)結(jié)構(gòu)之一,它是一種序列集合,可以存儲(chǔ)一個(gè)有序的字符串列表,順序是插入的順序,本文將給大家介紹了一下Redis中的List是如何實(shí)現(xiàn)的,需要的朋友可以參考下2024-05-05如何使用redis中的zset實(shí)現(xiàn)滑動(dòng)窗口限流
滑動(dòng)窗口限流是一種常見的流量控制方法,它限制了在一定時(shí)間窗口內(nèi)的請(qǐng)求數(shù)量,下面是使用Redis ZSet實(shí)現(xiàn)滑動(dòng)窗口限流的一個(gè)簡(jiǎn)單示例,需要的朋友可以參考下2023-09-09