Redis和數(shù)據(jù)庫的一致性(Canal+MQ) 的實(shí)現(xiàn)
想要保證緩存與數(shù)據(jù)庫的雙寫一致,一共有4種方式,即4種同步策略:
- 先更新緩存,再更新數(shù)據(jù)庫;
- 先更新數(shù)據(jù)庫,再更新緩存;
- 先刪除緩存,再更新數(shù)據(jù)庫;
- 先更新數(shù)據(jù)庫,再刪除緩存
首先說好結(jié)論,這4種同步策略無論是哪一種,都無法保證數(shù)據(jù)庫和redis的強(qiáng)一致性,只能保證最終一致性,如要保證強(qiáng)一致,那么只能通過加鎖來實(shí)現(xiàn),那么就會(huì)造成性能問題,即CAP理論中的AP(強(qiáng)一致)和CP(高可用性)進(jìn)行取舍,絕大多數(shù)場(chǎng)景是確保高可用(CP)。
更新緩存還是刪除緩存
下面,我們來分析一下,應(yīng)該采用更新緩存還是刪除緩存的方式。
1、更新緩存
優(yōu)點(diǎn):每次數(shù)據(jù)變化都及時(shí)更新緩存,所以查詢時(shí)不容易出現(xiàn)未命中的情況。
缺點(diǎn):更新緩存的消耗比較大。如果數(shù)據(jù)需要經(jīng)過復(fù)雜的計(jì)算再寫入緩存,那么頻繁的更新緩存,就會(huì)影響服務(wù)器的性能。如果是寫入數(shù)據(jù)頻繁的業(yè)務(wù)場(chǎng)景,那么可能頻繁的更新緩存時(shí),卻沒有業(yè)務(wù)讀取該數(shù)據(jù)。
2、刪除緩存
優(yōu)點(diǎn):操作簡(jiǎn)單,無論更新操作是否復(fù)雜,都是將緩存中的數(shù)據(jù)直接刪除。
缺點(diǎn):刪除緩存后,下一次查詢緩存會(huì)出現(xiàn)未命中,這時(shí)需要重新讀取一次數(shù)據(jù)庫。從上面的比較來看,一般情況下,刪除緩存是更優(yōu)的方案。
先操作數(shù)據(jù)庫還是更新緩存
1.先更新數(shù)據(jù)庫再刪除緩存
- 線程A更新數(shù)據(jù)庫成功,線程A刪除緩存失??;
- 線程B讀取緩存成功,由于緩存刪除失敗,所以線程B讀取到的是緩存中舊的數(shù)據(jù)。
- 最后線程A刪除緩存成功,有別的線程訪問緩存同樣的數(shù)據(jù),與數(shù)據(jù)庫中的數(shù)據(jù)是一樣。
- 最終,緩存和數(shù)據(jù)庫的數(shù)據(jù)是一致的,但是會(huì)有一些線程讀到舊的數(shù)據(jù)。
1.2正常情況下沒有出現(xiàn)失敗場(chǎng)景
在并發(fā)場(chǎng)景下,也許會(huì)有些許線程像線程b一樣讀的是舊數(shù)據(jù),但在刪除緩存后,最終緩存與數(shù)據(jù)庫的數(shù)據(jù)是一致的,并且都是最新的數(shù)據(jù)。但線程B在這個(gè)過程里讀到了舊的數(shù)據(jù),可能還有其他線程也像線程B一樣,在這兩步之間讀到了緩存中舊的數(shù)據(jù),但因?yàn)檫@兩步的執(zhí)行速度會(huì)比較快,所以影響不大。對(duì)于這兩步之后,其他進(jìn)程再讀取緩存數(shù)據(jù)的時(shí)候,就不會(huì)出現(xiàn)類似于進(jìn)程B的問題了。
2.先刪除緩存再更新數(shù)據(jù)庫
- 線程A刪除緩存成功,線程A更新數(shù)據(jù)庫失??;
- 線程B從緩存中讀取數(shù)據(jù);由于緩存被刪,進(jìn)程B無法從緩存中得到數(shù)據(jù),進(jìn)而從數(shù)據(jù)庫讀取數(shù)據(jù);此時(shí)數(shù)據(jù)庫中的數(shù)據(jù)更新失敗,線程B從數(shù)據(jù)庫成功獲取舊的數(shù)據(jù),然后將數(shù)據(jù)更新到了緩存。
- 最終,如果沒有異步重試的話緩存和數(shù)據(jù)庫的數(shù)據(jù)是一致的,但仍然是舊的數(shù)據(jù)。
2.2正常情況下沒有出現(xiàn)失敗場(chǎng)景
進(jìn)程A的兩步操作均成功,但由于存在并發(fā),在這兩步之間,進(jìn)程B訪問了緩存。最終結(jié)果是,緩存中存儲(chǔ)了舊的數(shù)據(jù),而數(shù)據(jù)庫中存儲(chǔ)了新的數(shù)據(jù),二者數(shù)據(jù)不一致。
這種方式的解決方案也就是在第2步更新數(shù)據(jù)庫后,延遲一會(huì)再刪一次Redis,也就是延遲雙刪,這樣就可以保證最終數(shù)據(jù)一致性。
最終結(jié)論:
經(jīng)過對(duì)比你會(huì)發(fā)現(xiàn),先更新數(shù)據(jù)庫、再刪除緩存是影響更小的方案。如果第二步出現(xiàn)失敗的情況,則可以采用重試機(jī)制解決問題。
最終解決方案
利用(MQ)消息隊(duì)列和Canal中間件進(jìn)行刪除的補(bǔ)償
Canal目前在大型企業(yè)中熱度下降,使用flinkcdc是目前的趨勢(shì),而目前主流CDC(變更數(shù)據(jù)獲?。┦?strong>flink cdc 而flinkcdc插件是基于flink平臺(tái)(大數(shù)據(jù)平臺(tái))此處只需要簡(jiǎn)單理解Canal作用并簡(jiǎn)單實(shí)現(xiàn)即可。目前企業(yè)中常見的數(shù)據(jù)同步方案就是CDC中間件+MQ的方案,大型公司一般是有大數(shù)據(jù)業(yè)務(wù),所以使用大數(shù)據(jù)平臺(tái)和kafka,此處使用的是Canal+Rabbitmq
Canal安裝與部署
Mysql前置準(zhǔn)備
在服務(wù)中找到Mysql配置文件對(duì)應(yīng)目錄
其中一共需要注意四個(gè)配置項(xiàng)
server-id=1 #master端的ID號(hào)【必須是唯一的】; log-bin=D:\MySQL\binlog\mysql-bin.log #同步的日志路徑,一定注意這個(gè)目錄要是mysql有權(quán)限寫入的 binlog_format=row #行級(jí),記錄每次操作后每行記錄的變化。 binlog-do-db=db_xiaomi #指定庫,縮小監(jiān)控的范圍。
1.查看端口號(hào)配置對(duì)應(yīng)主要用于集群環(huán)境下區(qū)分id
2.創(chuàng)建binlog文件存放目錄
3.數(shù)據(jù)的保存格式(一共有三種)
4.指定需要監(jiān)控的庫名(如果該項(xiàng)不指定配置,那么默認(rèn)所有數(shù)據(jù)庫開啟binlog)
設(shè)置好后啟動(dòng)服務(wù)。
啟動(dòng)后看到在binlog文件目錄中看到log文件
??Mysql的binlog日志三種格式:
Canal默認(rèn)選擇的是ROW
- STATEMENT:基于SQL語句的復(fù)制(statement-based replication, SBR)
- ROW:基于行的復(fù)制(row-based replication, RBR)
- MIXED:混合模式復(fù)制(mixed-based replication, MBR)
官網(wǎng)下載地址:Release v1.1.7 · alibaba/canal · GitHub
修改Mysql示例配置文件
修改連接數(shù)據(jù)庫授權(quán)的用戶和密碼
配置好后在bin目錄中執(zhí)行啟動(dòng)命令文件
到此完成
JAVA項(xiàng)目整合Canal
引入依賴
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency>
測(cè)試demo
public static void main(String[] args) throws InvalidProtocolBufferException { CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", ""); while (true) { //2.獲取連接 canalConnector.connect(); //3.指定要監(jiān)控的數(shù)據(jù)庫 canalConnector.subscribe("db.xiaomi.*"); //4.獲取 Message Message message = canalConnector.get(100); List<CanalEntry.Entry> entries = message.getEntries(); if (entries.size() <= 0) { System.out.println("沒有數(shù)據(jù),休息一會(huì)"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { for (CanalEntry.Entry entry : entries) { // 獲取表名 String tableName = entry.getHeader().getTableName(); // Entry 類型 CanalEntry.EntryType entryType = entry.getEntryType(); // 判斷 entryType 是否為 ROWDATA if (CanalEntry.EntryType.ROWDATA.equals(entryType)) { // 序列化數(shù)據(jù) ByteString storeValue = entry.getStoreValue(); // 反序列化 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); // 獲取事件類型 CanalEntry.EventType eventType = rowChange.getEventType(); // 獲取具體的數(shù)據(jù) List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); // 遍歷并打印數(shù)據(jù) for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); Map<String, Object> bMap = new HashMap<>(); for (CanalEntry.Column column : beforeColumnsList) { bMap.put(column.getName(), column.getValue()); } Map<String, Object> afMap = new HashMap<>(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { afMap.put(column.getName(), column.getValue()); } System.out.println("表名:" + tableName + ",操作類型:" + eventType); System.out.println("改前:" + bMap); System.out.println("改后:" + afMap); } } } } } }
JAVA項(xiàng)目整合RabbitMQ
?最終解決思路
即先更新數(shù)據(jù)庫,然后在刪除緩存,更新數(shù)據(jù)庫后通過Canal拉去MySQL的binlog日志,將更新消息放入MQ,由MQ異步執(zhí)行刪除操作。
業(yè)務(wù)思路:
在下訂后將庫存數(shù)據(jù)庫更新,根據(jù)商品的id值進(jìn)行更新緩存和es;
業(yè)務(wù)思路圖
?具體實(shí)現(xiàn)
那么Canal類的代碼如上圖所示,因?yàn)樗潜O(jiān)聽功能,那么就要一直啟動(dòng)保持運(yùn)行,目前是將Canal類放在頁面訪問服務(wù)實(shí)例中,那么在SpringBoot的Application啟動(dòng)時(shí)應(yīng)該也要將Canal啟動(dòng)。Spring提供兩種方式實(shí)現(xiàn)CommandLineRunner接口或@PostConstruct注解來實(shí)現(xiàn)。
此處以實(shí)現(xiàn)CommandLineRunner接口為例,只適合類似于初始化一些數(shù)據(jù)
??此處不適合使用上面的類,應(yīng)為其中使用了while(true)中寫了個(gè)死循環(huán)一直運(yùn)行,這樣就會(huì)導(dǎo)致啟動(dòng)類啟動(dòng)后執(zhí)行這個(gè)類而導(dǎo)致一直阻塞在這里。如果要使用那么應(yīng)該是在一個(gè)單獨(dú)的服務(wù)模塊中就可以這樣使用。
/**Canal監(jiān)聽類 * @author 12547 * @version 1.0 * @Date 2024/3/19 20:44 */ @Component public class CanalRunner implements CommandLineRunner { @Override public void run(String... args) throws Exception { System.out.println(">>>>>>>此處并不適用Canal的運(yùn)行方式<<<<<<<<<<"); System.out.println(">>>>>>>應(yīng)該是單獨(dú)起一個(gè)線程<<<<<<<<<<"); } }
可以看到在啟動(dòng)SpringBoot實(shí)例后,執(zhí)行了該方法。
??bug解決
但在編寫其他測(cè)試類的時(shí)候發(fā)現(xiàn)其一直阻塞在這里而不執(zhí)行測(cè)試代碼,推測(cè)其一直阻塞線程(因?yàn)橛兴姥h(huán))。
解決方案
將其改為異步形式執(zhí)行。將其改為@Async異步執(zhí)行。
?@Async的使用
應(yīng)為Async用到線程池相關(guān),所以先自定義一個(gè)用于異步的線程池
/** 自定義線程池 bean 用于Async異步調(diào)用 * @author 12547 * @version 1.0 * @Date 2024/3/20 15:45 */ @Configuration @EnableAsync public class AsyncConfig { /** * 自定義線程 * @return */ @Bean("asyncPoll") public ThreadPoolTaskExecutor asyncOperationExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 設(shè)置核心線程數(shù) executor.setCorePoolSize(8); // 設(shè)置最大線程數(shù) executor.setMaxPoolSize(20); // 設(shè)置隊(duì)列大小 executor.setQueueCapacity(Integer.MAX_VALUE); // 設(shè)置線程活躍時(shí)間(秒) executor.setKeepAliveSeconds(60); // 設(shè)置線程名前綴+分組名稱 executor.setThreadNamePrefix("AsyncOperationThread-"); executor.setThreadGroupName("AsyncOperationGroup"); // 所有任務(wù)結(jié)束后關(guān)閉線程池 executor.setWaitForTasksToCompleteOnShutdown(true); // 初始化 executor.initialize(); return executor; } }
將Canal的運(yùn)行代碼改為異步執(zhí)行
在啟動(dòng)類后通過調(diào)用使其異步執(zhí)行即可。并經(jīng)過測(cè)試后不再影響測(cè)試類的使用。
那么在Canal監(jiān)聽類中,當(dāng)監(jiān)聽到數(shù)據(jù)變化后,將變化發(fā)送給MQ消息
消費(fèi)者監(jiān)聽類
/**異步數(shù)據(jù)更新Redis類 * @author 12547 * @version 1.0 * @Date 2024/3/20 15:49 */ @Component public class RedisDataListenerService { @Autowired private CacheService cacheService; @Autowired private StringRedisTemplate redisTemplate; /** * Redis數(shù)據(jù)更新消費(fèi)者監(jiān)聽方法 */ @RabbitListener(queues = MqConstants.QUEUE_NAME) public void updateRedisDataByAsync(Map<String,Object> msg){ System.out.println("監(jiān)聽到數(shù)據(jù)變化:"); System.out.println("數(shù)據(jù)變化商品id:"+msg.get("id")); //正常情況Redis應(yīng)該每個(gè)商品id一個(gè)key TODO 需要改造詳情緩存查詢將List<phone>改為單獨(dú)的一個(gè)phone對(duì)象 redisTemplate.opsForHash().putAll(msg.get("id").toString(),msg); System.out.println(msg.get("id").toString()); System.out.println(cacheService.getHashCache(msg.get("id").toString(), "num")); } }
與Redis實(shí)現(xiàn)數(shù)據(jù)同步基本demo到這差不多了已經(jīng),后續(xù)可以結(jié)合項(xiàng)目進(jìn)一步優(yōu)化
到此這篇關(guān)于Redis和數(shù)據(jù)庫的一致性(Canal+MQ)的文章就介紹到這了,更多相關(guān)Redis和數(shù)據(jù)庫一致性內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis實(shí)現(xiàn)優(yōu)惠券限一單限制詳解
這篇文章主要介紹了Redis解決優(yōu)惠券秒殺應(yīng)用案例,本文先講了搶購問題,指出其中會(huì)出現(xiàn)的多線程問題,提出解決方案采用悲觀鎖和樂觀鎖兩種方式進(jìn)行實(shí)現(xiàn),然后發(fā)現(xiàn)在搶購過程中容易出現(xiàn)一人多單現(xiàn)象,需要的朋友可以參考下2022-12-12Win10配置redis服務(wù)實(shí)現(xiàn)過程詳解
這篇文章主要介紹了Win10配置redis服務(wù)實(shí)現(xiàn)過程詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07redis發(fā)布和訂閱_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要為大家詳細(xì)介紹了redis發(fā)布和訂閱的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-08-08