亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

SpringBoot整合canal實現(xiàn)數(shù)據(jù)緩存一致性解決方案

 更新時間:2024年03月15日 09:19:18   作者:牽著貓散步的鼠鼠  
canal主要用途是基于?MySQL?數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費,canal是借助于MySQL主從復制原理實現(xiàn),本文將給大家介紹SpringBoot整合canal實現(xiàn)數(shù)據(jù)緩存一致性解決方案,需要的朋友可以參考下

1.前言

canal [k?'næl],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費。其誕生的背景是早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業(yè)務需求,實現(xiàn)方式主要是基于業(yè)務 trigger 獲取增量變更。從 2010 年開始,業(yè)務逐步嘗試數(shù)據(jù)庫日志解析獲取增量變更進行同步,由此衍生出了大量的數(shù)據(jù)庫增量訂閱和消費業(yè)務。所以其核心功能如下:

  • 數(shù)據(jù)實時備份
  • 異構數(shù)據(jù)源(elasticsearch、Hbase)與數(shù)據(jù)庫數(shù)據(jù)增量同步
  • 業(yè)務緩存cache 刷新,保證緩存一致性
  • 帶業(yè)務邏輯的增量數(shù)據(jù)處理,如監(jiān)聽某個數(shù)據(jù)的變化做一定的邏輯處理

原理實現(xiàn)圖如下所示

canal是借助于MySQL主從復制原理實現(xiàn),所以我們接下來先來了解一下主從復制原理。

大概流程可以理解為如下: 

  • master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行查看);
  • slave將master的binary log events拷貝到它的中繼日志(relay log);
  • slave重做中繼日志中的事件,將改變反映它自己的數(shù)據(jù)。

canal的工作原理:

原理相對比較簡單:

  • canal模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議
  • mysql master收到dump請求,開始推送binary log給slave(也就是canal)
  • canal解析binary log對象(原始為byte流)

canal組件架構實現(xiàn)

我們可以大概看看canal是怎么實現(xiàn),其內(nèi)部的組件抽取、封裝及其對應的功能實現(xiàn),以便我們后續(xù)在使用上更加得心應手。

說明:

  • server代表一個canal運行實例,對應于一個jvm
  • instance對應于一個數(shù)據(jù)隊列 (1個server對應1..n個instance)

instance模塊:

  • eventParser (數(shù)據(jù)源接入,模擬slave協(xié)議和master進行交互,協(xié)議解析)
  • eventSink (Parser和Store鏈接器,進行數(shù)據(jù)過濾,加工,分發(fā)的工作)
  • eventStore (數(shù)據(jù)存儲)
  • metaManager (增量訂閱&消費信息管理器)

2.canal部署安裝

上面我們知道canal是通過把自己偽裝成mysql slave,收集binlog做解析,然后再進行后續(xù)同步操作。所以我們的準備工作必須要求MySQL開啟binlog日志:

[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復

授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant 

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

當然也可以不用新增賬號,直接使用root賬號,為了方便快捷,我下面的案例就是使用root賬號的哦,當然這不符合開發(fā)規(guī)范,root權限賬號一般人不能用的~

接下來就是安裝canal了,安裝方式主要分為直接下載安裝包在服務器通過命令運行和使用docker容器化方式部署,docker容器部署雖然簡單快捷,但是考慮到不是人人都了解docker,所以我們這里采用直接使用安裝包命令運行。

安裝包命令運行其實很簡單,官網(wǎng)教程步驟也很詳細:https://github.com/alibaba/canal/wiki/QuickStart,這里我下載version為1.1.5的,在官網(wǎng)下載安裝包解壓之后文件如下:

bin  canal.deployer-1.1.5.tar.gz  conf  lib  logs  plugin

主要看看conf目錄下配置文件:

canal.properties    example    logback.xml    metrics  spring

canal.properties是啟動canal server的配置文件:這里面有很多配置,我粘貼部分來講講

#################################################
#########               destinations            #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
?
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
?
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
?

canal.destinations = example就是指定instance實例的查找位置,如果我們一個canal server需要監(jiān)聽多個instance(平時各個業(yè)務線的數(shù)據(jù)庫都是獨立的如商品product,倉庫warehouse),一個instance監(jiān)聽一個數(shù)據(jù)庫,這是最常見的需求了,這時候我就需要配置多個instance,可以直接把example文件夾拷貝兩份,分別用數(shù)據(jù)庫命名新文件夾這樣方便我們快速了解該文件夾對應的instance是哪個業(yè)務線的。然后就是調(diào)整canal.properties

canal.destinations = product,warehouse

緊接著就是修改每個instance文件下的instance.properties,適配監(jiān)聽的數(shù)據(jù)庫配置信息

## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后會自動生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0
?
# 數(shù)據(jù)庫地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名稱
canal.instance.master.journal.name=mysql-bin.000001
# mysql主庫鏈接時起始的binlog偏移量
canal.instance.master.position=154
# mysql主庫鏈接時起始的binlog的時間戳
canal.instance.master.timestamp=
canal.instance.master.gtid=
?
# username/password
# 在MySQL服務器授權的賬號密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
?
# table regex .*\..*表示監(jiān)聽所有表 也可以寫具體的表名,用,隔開
canal.instance.filter.regex=.*\..*
# mysql 數(shù)據(jù)解析表的黑名單,多個表用,隔開
canal.instance.filter.black.regex=
?

最后在安裝包目錄下執(zhí)行以下命令就可以啟動了:

sh bin/startup.sh

是不是很簡單?。?! 但是你有沒有發(fā)現(xiàn)這種方式每新增一個instance,都需要修改配置文件并重啟,這樣會導致數(shù)據(jù)同步中斷不太友好,而且也沒有canal server服務的狀態(tài)監(jiān)控,著實覺得這框架不夠完善。阿里巴巴也考慮到了這些問題,所以提供了canal-admin,canal-admin設計上是為canal提供整體配置管理、節(jié)點運維等面向運維的功能,提供相對友好的WebUI操作界面,方便更多用戶快速和安全的操作。注意:canal-admin有以下限制要求:

MySQL,用于存儲配置和節(jié)點等相關數(shù)據(jù) canal版本,要求>=1.1.4 (需要依賴canal-server提供面向admin的動態(tài)運維管理接口)

在官網(wǎng)下載canal-admin的安裝包解壓如下:

bin  canal.admin-1.1.5.tar.gz  conf  lib  logs

直接來看conf下的文件:

application.yml  canal_manager.sql  canal-template.properties  instance-template.properties  logback.xml  public

這里看到的就是一個spring boot框架開發(fā)的web項目啦,anal_manager.sql就是canal-admin服務所依賴的數(shù)據(jù)庫初始化腳本,我們得去MySQL執(zhí)行,然后修改配置文件application.yml

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
?
spring.datasource:
  address: 10.10.0.10:3306
  database: canal_manager
  username: root
  password: root
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1
?
canal:
  adminUser: admin
  adminPasswd: admin
?

這里就配置一下前面執(zhí)行SQL腳本數(shù)據(jù)庫的連接信息即可,當然如果端口8089被占用了就改成別的,到時候canal server配置對應的就行。在canal-admin的目錄執(zhí)行下面命令就能啟動了:

sh bin/startup.sh

 這時候通過主機ip:8089就能在瀏覽器訪問:

默認登錄用戶名密碼:admin/123456,成功進入之后: 

我們可以通過界面管理canal集群、canal server 、server下的instance。這樣無論是我們修改instance的配置還是新增一個instance都不需要去服務器操作并重啟服務了,是不是很方便,直接通過界面操作修改、重啟即可。

當然還是需要像一開始一樣在服務器啟動canal server的,需要把配置canal.properties改成如下:

# register ip
canal.register.ip =
?
# canal admin config
canal.admin.manager = 10.10.0.10:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name = 

這里最主要是綁定關聯(lián)canal-admin,配置admin的地址信息。這里提一下canal.register.ip這個配置是和canal集群有關的,canal集群是依靠zookeeper實現(xiàn),這里就不展開細講了。成功啟動canal server之后,就可以在admin界面看到了:

然后我們可以基于canal server新增instance:mall和fast-api

這時候你來查看canal server 下的配置目錄conf下: 

canal.properties example fast-api logback.xml mall metrics spring

發(fā)現(xiàn)多了兩個目錄mall和fast-api,這就是對應我們前面在界面上創(chuàng)建的兩個instance,admin通過關聯(lián)canal server自動幫我們生成,是不是很完美?。?!

3.Spring Boot整合canal

3.1數(shù)據(jù)庫與緩存一致性問題概述

自有了緩存的那一天起,緩存與數(shù)據(jù)庫數(shù)據(jù)一致性問題就一直伴隨著后端開發(fā)者,所以如何保證數(shù)據(jù)庫與緩存雙寫數(shù)據(jù)一致性也成為了面試的一個高頻考點。對于緩存的更新肯定來自于業(yè)務的觸發(fā),且最終的邏輯處理數(shù)據(jù)是需要落庫的,只是我們需要考慮的是先更新DB還是先更新緩存?是更新緩存還是刪除緩存?在常規(guī)情況下,怎么操作都可以,但一旦面對高并發(fā)場景,就值得細細思量了。 接下來我們就來分別看看先寫數(shù)據(jù)庫或者先寫緩存有啥問題?

這里我們假設我們的某個業(yè)務功能請求就是修改某個數(shù)據(jù)值

先寫 MySQL,再寫 Redis

請求 A、B 都是先寫 MySQL,然后再寫 Redis,在高并發(fā)情況下,如果請求 A 在寫 Redis 時卡了一會,請求 B 已經(jīng)依次完成數(shù)據(jù)的更新,就會出現(xiàn)圖中的問題。并發(fā)場景下,這樣的情況是很容易出現(xiàn)的,每個線程的操作先后順序不同,這樣就導致請求B的緩存值被請求A給覆蓋了,數(shù)據(jù)庫中是線程B的新值,緩存中是線程A的舊值,并且會一直這么臟下去直到緩存失效(如果你設置了過期時間的話)。

先寫Redis,再寫MySQL

和上面一樣只是調(diào)換了寫入數(shù)據(jù)庫與緩存的順序,直接看圖:

高并發(fā)場景下一樣有數(shù)據(jù)一致性問題。

還有數(shù)據(jù)更新刪除緩存、延時雙刪緩存等解決一致性問題方式,這里就不一一列舉了,本質上上面我只是引出數(shù)據(jù)庫與緩存雙寫一致性問題,畢竟我們今天主題是canal,不是雙寫一致性問題解決方案詳解,有興趣可以自行查閱這個高頻面試知識。

3.2 整合canel

canal官方?jīng)]有提供與spring-boot框架快速整合的starter,根據(jù)官網(wǎng)示例直接使用canal client直連canal server操作:

引入依賴

    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.4</version>
    </dependency>

示例:我們在上面的canal admin創(chuàng)建一個mall的實例監(jiān)聽mall數(shù)據(jù)庫變化,接下來我通過新增、修改、刪除品牌的一條數(shù)據(jù)。

package com.shepherd.common.canal;
?
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
?
import java.net.InetSocketAddress;
import java.util.List;
?
public class CanalClient {
?
    public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
?
        // 創(chuàng)建canal客戶端,單鏈接模式
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.10.0.10",
                11111), "mall", "", "");
        // 創(chuàng)建連接
        canalConnector.connect();
        while (true) {
            // 訂閱數(shù)據(jù)庫
            // canalConnector.subscribe("mall");
?
            // 獲取數(shù)據(jù)
            Message message = canalConnector.get(100);
?
            // 獲取Entry集合
            List<CanalEntry.Entry> entries = message.getEntries();
?
            // 判斷集合是否為空,如果為空,則等待一會繼續(xù)拉取數(shù)據(jù)
            if (entries.size() <= 0) {
//                System.out.println("當次抓取沒有數(shù)據(jù),休息一會。。。。。。");
                Thread.sleep(1000);
            } else {
                // 遍歷entries,單條解析
                for (CanalEntry.Entry entry : entries) {
?
                    //1.獲取表名
                    String tableName = entry.getHeader().getTableName();
?
                    //2.獲取類型
                    CanalEntry.EntryType entryType = entry.getEntryType();
?
                    //3.獲取序列化后的數(shù)據(jù)
                    ByteString storeValue = entry.getStoreValue();
?
                    //4.判斷當前entryType類型是否為ROWDATA
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
?
                        //5.反序列化數(shù)據(jù)
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
?
                        //6.獲取當前事件的操作類型
                        CanalEntry.EventType eventType = rowChange.getEventType();
?
                        //7.獲取數(shù)據(jù)集
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
?
                        //8.遍歷rowDataList,并打印數(shù)據(jù)集
                        for (CanalEntry.RowData rowData : rowDataList) {
?
                            JSONObject beforeData = new JSONObject();
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                beforeData.put(column.getName(), column.getValue());
                            }
?
                            JSONObject afterData = new JSONObject();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            for (CanalEntry.Column column : afterColumnsList) {
                                afterData.put(column.getName(), column.getValue());
                            }
?
                            //數(shù)據(jù)打印
                            System.out.println("Table:" + tableName +
                                    ",EventType:" + eventType +
                                    ",Before:" + beforeData +
                                    ",After:" + afterData);
                        }
                    }
                }
            }
        }
    }
}
?
?

控制臺打印如下:

?
Table:brand,EventType:INSERT,Before:{},After:{"image":"","update_time":"","category_id":"1","create_time":"","letter":"H","name":"huawei","description":"世界第一名族企業(yè)","id":"1","is_delete":""}
?
Table:brand,EventType:UPDATE,Before:{"image":"","update_time":"","category_id":"1","create_time":"","letter":"H","name":"huawei","description":"世界第一名族企業(yè)","id":"1","is_delete":""},After:{"image":"http://www.baidu.com/image1.png","update_time":"","category_id":"1","create_time":"","letter":"H","name":"huawei111","description":"世界第一名族企業(yè)","id":"1","is_delete":""}
?
Table:brand,EventType:DELETE,Before:{"image":"http://www.baidu.com/image1.png","update_time":"","category_id":"1","create_time":"","letter":"H","name":"huawei111","description":"世界第一名族企業(yè)","id":"1","is_delete":""},After:{}
?

可以看到canal監(jiān)控到表數(shù)據(jù)的變更方式以及數(shù)據(jù)的前后變化。這樣我們就可以通過canal監(jiān)聽MySQL binlog原理優(yōu)雅實現(xiàn)緩存與數(shù)據(jù)庫數(shù)據(jù)一致性解決方案啦,通過的監(jiān)聽得到數(shù)據(jù)信息進行緩存同步寫操作即可。

4.總結

canal是一個增量數(shù)據(jù)同步組件,其好處就是在于對業(yè)務邏輯無侵入,它是通過把自己偽裝成mysql slave收集binlog實現(xiàn)數(shù)據(jù)同步的。這里要強調(diào)一下異構數(shù)據(jù)源之間要實現(xiàn)數(shù)據(jù)增量同步,同時要保證實時性、低延時在大數(shù)據(jù)領域也是一個令人頭疼的問題,不像全量同步簡單直接全部數(shù)據(jù)寫到目的源就可以了。canal就是為了解決增量同步而生,這是其招牌。當然canal也是有缺點的,只能監(jiān)聽MySQL,其他數(shù)據(jù)庫oracle就不行了。

同時也要指明企業(yè)級的數(shù)據(jù)同步不可能像上面的方式一條條監(jiān)聽數(shù)據(jù)變化同步異構數(shù)據(jù)源如elasticsearch、Hbase的,因為一條條處理同步速度之慢不言而喻,當然通過上面方式同步數(shù)據(jù)到緩存redis是可以的,因為緩存的數(shù)據(jù)一般變化不頻繁且數(shù)據(jù)量不大,但同步其他大數(shù)據(jù)組件就一般都需要批量同步了,這時候就需要借助消息隊列中間件如kafka進行數(shù)據(jù)堆積從而實現(xiàn)批量同步,canal+kafka+elasticsearch這個架構是當下許多企業(yè)中較常見的一種數(shù)據(jù)同步的方案。

以上就是SpringBoot整合canal實現(xiàn)數(shù)據(jù)緩存一致性解決方案的詳細內(nèi)容,更多關于SpringBoot canal數(shù)據(jù)緩存一致性的資料請關注腳本之家其它相關文章!

相關文章

  • MyBatis-Plus之邏輯刪除的實現(xiàn)

    MyBatis-Plus之邏輯刪除的實現(xiàn)

    這篇文章主要介紹了MyBatis-Plus之邏輯刪除的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-11-11
  • mapper.xml無法解析字段的問題

    mapper.xml無法解析字段的問題

    這篇文章主要介紹了mapper.xml無法解析字段的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-06-06
  • 使用springboot在工具類中讀取配置文件(ClassPathResource)

    使用springboot在工具類中讀取配置文件(ClassPathResource)

    這篇文章主要介紹了使用springboot在工具類中讀取配置文件(ClassPathResource),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Java實體類之間的相互轉換方式

    Java實體類之間的相互轉換方式

    這篇文章主要介紹了Java實體類之間的相互轉換方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-08-08
  • 關于springboot配置druid數(shù)據(jù)源不生效問題(踩坑記)

    關于springboot配置druid數(shù)據(jù)源不生效問題(踩坑記)

    今天日常跟著網(wǎng)課學習,學到了整合druid數(shù)據(jù)源,遇到了好幾個坑,希望這篇文章可以幫助一些和我一樣踩坑的人
    2021-09-09
  • java JVM原理與常識知識點

    java JVM原理與常識知識點

    在本文中小編給大家分享的是關于java的JVM原理和java常識,有興趣的朋友們可以學習下
    2018-12-12
  • 使用Jackson-json解析一個嵌套的json字符串

    使用Jackson-json解析一個嵌套的json字符串

    這篇文章主要介紹了使用Jackson-json解析一個嵌套的json字符串,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • java普通項目讀取不到resources目錄下資源文件的解決辦法

    java普通項目讀取不到resources目錄下資源文件的解決辦法

    這篇文章主要給大家介紹了關于java普通項目讀取不到resources目錄下資源文件的解決辦法,Web項目中應該經(jīng)常有這樣的需求,在maven項目的resources目錄下放一些文件,比如一些配置文件,資源文件等,需要的朋友可以參考下
    2023-09-09
  • SpringBoot項目創(chuàng)建使用+配置文件+日志文件詳解

    SpringBoot項目創(chuàng)建使用+配置文件+日志文件詳解

    Spring的出現(xiàn)是為了簡化 Java 程序開發(fā),而 SpringBoot 的出現(xiàn)是為了簡化 Spring 程序開發(fā),這篇文章主要介紹了SpringBoot項目創(chuàng)建使用+配置文件+日志文件,需要的朋友可以參考下
    2023-02-02
  • Java如何使用正則表達式查找指定字符串

    Java如何使用正則表達式查找指定字符串

    在軟件開發(fā)中正則表達式是個很有用的功能,使用正則表達式可以簡化代碼,省去不少時間,下面這篇文章主要給大家介紹了關于Java如何使用正則表達式查找指定字符串的相關資料,需要的朋友可以參考下
    2022-09-09

最新評論