SpringBoot整合canal實現(xiàn)數(shù)據(jù)緩存一致性解決方案
1.前言
canal [k?'næl],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費。其誕生的背景是早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業(yè)務需求,實現(xiàn)方式主要是基于業(yè)務 trigger 獲取增量變更。從 2010 年開始,業(yè)務逐步嘗試數(shù)據(jù)庫日志解析獲取增量變更進行同步,由此衍生出了大量的數(shù)據(jù)庫增量訂閱和消費業(yè)務。所以其核心功能如下:
- 數(shù)據(jù)實時備份
- 異構數(shù)據(jù)源(elasticsearch、Hbase)與數(shù)據(jù)庫數(shù)據(jù)增量同步
- 業(yè)務緩存cache 刷新,保證緩存一致性
- 帶業(yè)務邏輯的增量數(shù)據(jù)處理,如監(jiān)聽某個數(shù)據(jù)的變化做一定的邏輯處理
原理實現(xiàn)圖如下所示
canal是借助于MySQL主從復制原理實現(xiàn),所以我們接下來先來了解一下主從復制原理。
大概流程可以理解為如下:
- master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行查看);
- slave將master的binary log events拷貝到它的中繼日志(relay log);
- slave重做中繼日志中的事件,將改變反映它自己的數(shù)據(jù)。
canal的工作原理:
原理相對比較簡單:
- canal模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議
- mysql master收到dump請求,開始推送binary log給slave(也就是canal)
- canal解析binary log對象(原始為byte流)
canal組件架構實現(xiàn)
我們可以大概看看canal是怎么實現(xiàn),其內(nèi)部的組件抽取、封裝及其對應的功能實現(xiàn),以便我們后續(xù)在使用上更加得心應手。
說明:
- server代表一個canal運行實例,對應于一個jvm
- instance對應于一個數(shù)據(jù)隊列 (1個server對應1..n個instance)
instance模塊:
- eventParser (數(shù)據(jù)源接入,模擬slave協(xié)議和master進行交互,協(xié)議解析)
- eventSink (Parser和Store鏈接器,進行數(shù)據(jù)過濾,加工,分發(fā)的工作)
- eventStore (數(shù)據(jù)存儲)
- metaManager (增量訂閱&消費信息管理器)
2.canal部署安裝
上面我們知道canal是通過把自己偽裝成mysql slave,收集binlog做解析,然后再進行后續(xù)同步操作。所以我們的準備工作必須要求MySQL開啟binlog日志:
[mysqld] log-bin=mysql-bin # 開啟 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
當然也可以不用新增賬號,直接使用root賬號,為了方便快捷,我下面的案例就是使用root賬號的哦,當然這不符合開發(fā)規(guī)范,root權限賬號一般人不能用的~
接下來就是安裝canal了,安裝方式主要分為直接下載安裝包在服務器通過命令運行和使用docker容器化方式部署,docker容器部署雖然簡單快捷,但是考慮到不是人人都了解docker,所以我們這里采用直接使用安裝包命令運行。
安裝包命令運行其實很簡單,官網(wǎng)教程步驟也很詳細:https://github.com/alibaba/canal/wiki/QuickStart,這里我下載version為1.1.5的,在官網(wǎng)下載安裝包解壓之后文件如下:
bin canal.deployer-1.1.5.tar.gz conf lib logs plugin
主要看看conf目錄下配置文件:
canal.properties example logback.xml metrics spring
canal.properties是啟動canal server的配置文件:這里面有很多配置,我粘貼部分來講講
################################################# ######### destinations ############# ################################################# canal.destinations = example # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 # set this value to 'true' means that when binlog pos not found, skip to latest. # WARN: pls keep 'false' in production env, or if you know what you want. canal.auto.reset.latest.pos.mode = false ? canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml ? canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml ?
canal.destinations = example就是指定instance實例的查找位置,如果我們一個canal server需要監(jiān)聽多個instance(平時各個業(yè)務線的數(shù)據(jù)庫都是獨立的如商品product,倉庫warehouse),一個instance監(jiān)聽一個數(shù)據(jù)庫,這是最常見的需求了,這時候我就需要配置多個instance,可以直接把example文件夾拷貝兩份,分別用數(shù)據(jù)庫命名新文件夾這樣方便我們快速了解該文件夾對應的instance是哪個業(yè)務線的。然后就是調(diào)整canal.properties
canal.destinations = product,warehouse
緊接著就是修改每個instance文件下的instance.properties,適配監(jiān)聽的數(shù)據(jù)庫配置信息
## mysql serverId , v1.0.26+ will autoGen ## v1.0.26版本后會自動生成slaveId,所以可以不用配置 # canal.instance.mysql.slaveId=0 ? # 數(shù)據(jù)庫地址 canal.instance.master.address=127.0.0.1:3306 # binlog日志名稱 canal.instance.master.journal.name=mysql-bin.000001 # mysql主庫鏈接時起始的binlog偏移量 canal.instance.master.position=154 # mysql主庫鏈接時起始的binlog的時間戳 canal.instance.master.timestamp= canal.instance.master.gtid= ? # username/password # 在MySQL服務器授權的賬號密碼 canal.instance.dbUsername=canal canal.instance.dbPassword=Canal@123456 # 字符集 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false ? # table regex .*\..*表示監(jiān)聽所有表 也可以寫具體的表名,用,隔開 canal.instance.filter.regex=.*\..* # mysql 數(shù)據(jù)解析表的黑名單,多個表用,隔開 canal.instance.filter.black.regex= ?
最后在安裝包目錄下執(zhí)行以下命令就可以啟動了:
sh bin/startup.sh
是不是很簡單?。?! 但是你有沒有發(fā)現(xiàn)這種方式每新增一個instance,都需要修改配置文件并重啟,這樣會導致數(shù)據(jù)同步中斷不太友好,而且也沒有canal server服務的狀態(tài)監(jiān)控,著實覺得這框架不夠完善。阿里巴巴也考慮到了這些問題,所以提供了canal-admin,canal-admin設計上是為canal提供整體配置管理、節(jié)點運維等面向運維的功能,提供相對友好的WebUI操作界面,方便更多用戶快速和安全的操作。注意:canal-admin有以下限制要求:
MySQL,用于存儲配置和節(jié)點等相關數(shù)據(jù) canal版本,要求>=1.1.4 (需要依賴canal-server提供面向admin的動態(tài)運維管理接口)
在官網(wǎng)下載canal-admin的安裝包解壓如下:
bin canal.admin-1.1.5.tar.gz conf lib logs
直接來看conf下的文件:
application.yml canal_manager.sql canal-template.properties instance-template.properties logback.xml public
這里看到的就是一個spring boot框架開發(fā)的web項目啦,anal_manager.sql就是canal-admin服務所依賴的數(shù)據(jù)庫初始化腳本,我們得去MySQL執(zhí)行,然后修改配置文件application.yml
server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 ? spring.datasource: address: 10.10.0.10:3306 database: canal_manager username: root password: root driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 ? canal: adminUser: admin adminPasswd: admin ?
這里就配置一下前面執(zhí)行SQL腳本數(shù)據(jù)庫的連接信息即可,當然如果端口8089被占用了就改成別的,到時候canal server配置對應的就行。在canal-admin的目錄執(zhí)行下面命令就能啟動了:
sh bin/startup.sh
這時候通過主機ip:8089就能在瀏覽器訪問:
默認登錄用戶名密碼:admin/123456,成功進入之后:
我們可以通過界面管理canal集群、canal server 、server下的instance。這樣無論是我們修改instance的配置還是新增一個instance都不需要去服務器操作并重啟服務了,是不是很方便,直接通過界面操作修改、重啟即可。
當然還是需要像一開始一樣在服務器啟動canal server的,需要把配置canal.properties改成如下:
# register ip canal.register.ip = ? # canal admin config canal.admin.manager = 10.10.0.10:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster = canal.admin.register.name =
這里最主要是綁定關聯(lián)canal-admin,配置admin的地址信息。這里提一下canal.register.ip這個配置是和canal集群有關的,canal集群是依靠zookeeper實現(xiàn),這里就不展開細講了。成功啟動canal server之后,就可以在admin界面看到了:
然后我們可以基于canal server新增instance:mall和fast-api
這時候你來查看canal server 下的配置目錄conf下:
canal.properties example fast-api logback.xml mall metrics spring
發(fā)現(xiàn)多了兩個目錄mall和fast-api,這就是對應我們前面在界面上創(chuàng)建的兩個instance,admin通過關聯(lián)canal server自動幫我們生成,是不是很完美?。?!
3.Spring Boot整合canal
3.1數(shù)據(jù)庫與緩存一致性問題概述
自有了緩存的那一天起,緩存與數(shù)據(jù)庫數(shù)據(jù)一致性問題就一直伴隨著后端開發(fā)者,所以如何保證數(shù)據(jù)庫與緩存雙寫數(shù)據(jù)一致性也成為了面試的一個高頻考點。對于緩存的更新肯定來自于業(yè)務的觸發(fā),且最終的邏輯處理數(shù)據(jù)是需要落庫的,只是我們需要考慮的是先更新DB還是先更新緩存?是更新緩存還是刪除緩存?在常規(guī)情況下,怎么操作都可以,但一旦面對高并發(fā)場景,就值得細細思量了。 接下來我們就來分別看看先寫數(shù)據(jù)庫或者先寫緩存有啥問題?
這里我們假設我們的某個業(yè)務功能請求就是修改某個數(shù)據(jù)值
先寫 MySQL,再寫 Redis
請求 A、B 都是先寫 MySQL,然后再寫 Redis,在高并發(fā)情況下,如果請求 A 在寫 Redis 時卡了一會,請求 B 已經(jīng)依次完成數(shù)據(jù)的更新,就會出現(xiàn)圖中的問題。并發(fā)場景下,這樣的情況是很容易出現(xiàn)的,每個線程的操作先后順序不同,這樣就導致請求B的緩存值被請求A給覆蓋了,數(shù)據(jù)庫中是線程B的新值,緩存中是線程A的舊值,并且會一直這么臟下去直到緩存失效(如果你設置了過期時間的話)。
先寫Redis,再寫MySQL
和上面一樣只是調(diào)換了寫入數(shù)據(jù)庫與緩存的順序,直接看圖:
高并發(fā)場景下一樣有數(shù)據(jù)一致性問題。
還有數(shù)據(jù)更新刪除緩存、延時雙刪緩存等解決一致性問題方式,這里就不一一列舉了,本質上上面我只是引出數(shù)據(jù)庫與緩存雙寫一致性問題,畢竟我們今天主題是canal,不是雙寫一致性問題解決方案詳解,有興趣可以自行查閱這個高頻面試知識。
3.2 整合canel
canal官方?jīng)]有提供與spring-boot框架快速整合的starter,根據(jù)官網(wǎng)示例直接使用canal client直連canal server操作:
引入依賴:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency>
示例:我們在上面的canal admin創(chuàng)建一個mall的實例監(jiān)聽mall數(shù)據(jù)庫變化,接下來我通過新增、修改、刪除品牌的一條數(shù)據(jù)。
package com.shepherd.common.canal; ? import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; ? import java.net.InetSocketAddress; import java.util.List; ? public class CanalClient { ? public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException { ? // 創(chuàng)建canal客戶端,單鏈接模式 CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.10.0.10", 11111), "mall", "", ""); // 創(chuàng)建連接 canalConnector.connect(); while (true) { // 訂閱數(shù)據(jù)庫 // canalConnector.subscribe("mall"); ? // 獲取數(shù)據(jù) Message message = canalConnector.get(100); ? // 獲取Entry集合 List<CanalEntry.Entry> entries = message.getEntries(); ? // 判斷集合是否為空,如果為空,則等待一會繼續(xù)拉取數(shù)據(jù) if (entries.size() <= 0) { // System.out.println("當次抓取沒有數(shù)據(jù),休息一會。。。。。。"); Thread.sleep(1000); } else { // 遍歷entries,單條解析 for (CanalEntry.Entry entry : entries) { ? //1.獲取表名 String tableName = entry.getHeader().getTableName(); ? //2.獲取類型 CanalEntry.EntryType entryType = entry.getEntryType(); ? //3.獲取序列化后的數(shù)據(jù) ByteString storeValue = entry.getStoreValue(); ? //4.判斷當前entryType類型是否為ROWDATA if (CanalEntry.EntryType.ROWDATA.equals(entryType)) { ? //5.反序列化數(shù)據(jù) CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); ? //6.獲取當前事件的操作類型 CanalEntry.EventType eventType = rowChange.getEventType(); ? //7.獲取數(shù)據(jù)集 List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList(); ? //8.遍歷rowDataList,并打印數(shù)據(jù)集 for (CanalEntry.RowData rowData : rowDataList) { ? JSONObject beforeData = new JSONObject(); List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { beforeData.put(column.getName(), column.getValue()); } ? JSONObject afterData = new JSONObject(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { afterData.put(column.getName(), column.getValue()); } ? //數(shù)據(jù)打印 System.out.println("Table:" + tableName + ",EventType:" + eventType + ",Before:" + beforeData + ",After:" + afterData); } } } } } } } ? ?
控制臺打印如下:
? Table:brand,EventType:INSERT,Before:{},After:{"image":"","update_time":"","category_id":"1","create_time":"","letter":"H","name":"huawei","description":"世界第一名族企業(yè)","id":"1","is_delete":""} ? Table:brand,EventType:UPDATE,Before:{"image":"","update_time":"","category_id":"1","create_time":"","letter":"H","name":"huawei","description":"世界第一名族企業(yè)","id":"1","is_delete":""},After:{"image":"http://www.baidu.com/image1.png","update_time":"","category_id":"1","create_time":"","letter":"H","name":"huawei111","description":"世界第一名族企業(yè)","id":"1","is_delete":""} ? Table:brand,EventType:DELETE,Before:{"image":"http://www.baidu.com/image1.png","update_time":"","category_id":"1","create_time":"","letter":"H","name":"huawei111","description":"世界第一名族企業(yè)","id":"1","is_delete":""},After:{} ?
可以看到canal監(jiān)控到表數(shù)據(jù)的變更方式以及數(shù)據(jù)的前后變化。這樣我們就可以通過canal監(jiān)聽MySQL binlog原理優(yōu)雅實現(xiàn)緩存與數(shù)據(jù)庫數(shù)據(jù)一致性解決方案啦,通過的監(jiān)聽得到數(shù)據(jù)信息進行緩存同步寫操作即可。
4.總結
canal是一個增量數(shù)據(jù)同步組件,其好處就是在于對業(yè)務邏輯無侵入,它是通過把自己偽裝成mysql slave收集binlog實現(xiàn)數(shù)據(jù)同步的。這里要強調(diào)一下異構數(shù)據(jù)源之間要實現(xiàn)數(shù)據(jù)增量同步,同時要保證實時性、低延時在大數(shù)據(jù)領域也是一個令人頭疼的問題,不像全量同步簡單直接全部數(shù)據(jù)寫到目的源就可以了。canal就是為了解決增量同步而生,這是其招牌。當然canal也是有缺點的,只能監(jiān)聽MySQL,其他數(shù)據(jù)庫oracle就不行了。
同時也要指明企業(yè)級的數(shù)據(jù)同步不可能像上面的方式一條條監(jiān)聽數(shù)據(jù)變化同步異構數(shù)據(jù)源如elasticsearch、Hbase的,因為一條條處理同步速度之慢不言而喻,當然通過上面方式同步數(shù)據(jù)到緩存redis是可以的,因為緩存的數(shù)據(jù)一般變化不頻繁且數(shù)據(jù)量不大,但同步其他大數(shù)據(jù)組件就一般都需要批量同步了,這時候就需要借助消息隊列中間件如kafka進行數(shù)據(jù)堆積從而實現(xiàn)批量同步,canal+kafka+elasticsearch這個架構是當下許多企業(yè)中較常見的一種數(shù)據(jù)同步的方案。
以上就是SpringBoot整合canal實現(xiàn)數(shù)據(jù)緩存一致性解決方案的詳細內(nèi)容,更多關于SpringBoot canal數(shù)據(jù)緩存一致性的資料請關注腳本之家其它相關文章!
相關文章
使用springboot在工具類中讀取配置文件(ClassPathResource)
這篇文章主要介紹了使用springboot在工具類中讀取配置文件(ClassPathResource),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08關于springboot配置druid數(shù)據(jù)源不生效問題(踩坑記)
今天日常跟著網(wǎng)課學習,學到了整合druid數(shù)據(jù)源,遇到了好幾個坑,希望這篇文章可以幫助一些和我一樣踩坑的人2021-09-09java普通項目讀取不到resources目錄下資源文件的解決辦法
這篇文章主要給大家介紹了關于java普通項目讀取不到resources目錄下資源文件的解決辦法,Web項目中應該經(jīng)常有這樣的需求,在maven項目的resources目錄下放一些文件,比如一些配置文件,資源文件等,需要的朋友可以參考下2023-09-09SpringBoot項目創(chuàng)建使用+配置文件+日志文件詳解
Spring的出現(xiàn)是為了簡化 Java 程序開發(fā),而 SpringBoot 的出現(xiàn)是為了簡化 Spring 程序開發(fā),這篇文章主要介紹了SpringBoot項目創(chuàng)建使用+配置文件+日志文件,需要的朋友可以參考下2023-02-02