canal實(shí)現(xiàn)mysql數(shù)據(jù)同步的詳細(xì)過(guò)程
1、canal下載
canal實(shí)現(xiàn)mysql數(shù)據(jù)同步可以直接安裝canal server就可以了,但是為了方便管理(instance配置,canal server狀態(tài)管理,集群等),需要安裝canal admin,應(yīng)用下載地址:Releases · alibaba/canal · GitHub
進(jìn)入頁(yè)面可以選擇需要安裝的版本
下載canal.deployer-1.1.8.tar.gz和canal.admin-1.1.8.tar.gz
2、mysql同步用戶(hù)創(chuàng)建和授權(quán)
登錄mysql mysql -h 127.0.0.1 -P 3306 -u root -p 創(chuàng)建同步用戶(hù) repl 密碼設(shè)為123456 CREATE USER 'repl'@'%' IDENTIFIED BY '123456'; 給予同步權(quán)限 GRANT REPLICATION SLAVE ON *.* to 'repl'@'%' identified by '123456'; 給予repl只讀test庫(kù)的權(quán)限,test庫(kù)是用來(lái)同步數(shù)據(jù)的 GRANT SELECT ON test.* to 'repl'@'%' identified by '123456'; canal_manager是canal admin需要的,給予repl對(duì)該庫(kù)的讀寫(xiě)權(quán)限 GRANT ALL PRIVILEGES ON canal_manager.* to 'repl'@'%' identified by '123456'; mysql my.cnf配置文件增加主從配置master數(shù)據(jù)庫(kù)的配置信息 #主數(shù)據(jù)主從配置 唯一id server_id=1 #開(kāi)啟logbin log-bin=mysql-bin #寫(xiě)入模式 row binlog-format=ROW #需要同步的庫(kù) binlog-do-db=test #忽略的數(shù)據(jù)庫(kù) replicate-ignore-db=mysql replicate-ignore-db=sys replicate-ignore-db=information_schema replicate-ignore-db=performance_schema
在canal-admin解壓文件的conf中有一個(gè)canal_manager.sql,導(dǎo)入到master數(shù)據(jù)庫(kù)
3、canal admin安裝和啟動(dòng)
把canal.admin-1.1.8.tar.gz上傳到linux
解壓 tar -zvxf canal.admin-1.1.8.tar.gz
進(jìn)入conf目錄下,編輯application.yml配置文件。
server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: repl password: 123456 driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: 123456
重點(diǎn)介紹以下幾個(gè)參數(shù):
address:我們需要訂閱(也就是mysql master服務(wù)器)mysql所在的服務(wù)器IP和數(shù)據(jù)庫(kù)端口。
database:canal.admin web系統(tǒng)必須的幾張表,需要在mysql master服務(wù)器上初始化conf/canal_manager.sql文件。
sername和password就是mysql master服務(wù)器創(chuàng)建的用于復(fù)制的用戶(hù)和密碼,也就是我們?cè)赾anal server中配置的repl 和 123456。
driver-class-name:mysql的驅(qū)動(dòng),默認(rèn)是MYSQL5的驅(qū)動(dòng),如果你的MYSQL是8的(我的就是),要將驅(qū)動(dòng)改為com.mysql.cj.jdbc.Driver。
另外,還需要在mysql連接后面加上allowPublicKeyRetrieval=true,不然啟動(dòng)時(shí),有可能報(bào)錯(cuò)。
啟動(dòng)canal.admin
進(jìn)入bin目錄,執(zhí)行如下命令,啟動(dòng)canal.admin:
./startup.sh
查看 admin 日志
2022-12-10 03:13:58.995 [main] INFO o.s.jmx.export.annotation.AnnotationMBeanExporter -
Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2022-12-10 03:13:59.015 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
2022-12-10 03:13:59.038 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2022-12-10 03:13:59.214 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
2022-12-10 03:13:59.221 [main] INFO com.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 14.281 seconds (JVM running for 15.894)
如果出現(xiàn)上述日志,說(shuō)明啟動(dòng)成功!
登錄admin
通過(guò)http://127.0.0.1:8089/訪問(wèn),默認(rèn)密碼:admin/123456。
注意,IP和密碼需要改成你自己配置的。如果是在服務(wù)器上配置的,別忘記放開(kāi)8089端口。
輸入用戶(hù)名和密碼之后,出現(xiàn)上述頁(yè)面說(shuō)明配置成功!
如果需要修改密碼,直接通過(guò)執(zhí)行 select upper(sha1(unhex(sha1('1234567')))) 這個(gè)sql得到結(jié)果,然后復(fù)制到canal_manager庫(kù)的canal_user表的password字段中就可以了,其中1234567是明文密碼,執(zhí)行上述sql會(huì)得到一個(gè)密碼。
4、canal server安裝和啟動(dòng)
把canal.deployer-1.1.8.tar.gz上傳到linux
解壓 tar -zvxf ccanal.deployer-1.1.8.tar.gz
進(jìn)入conf目錄下,編輯canal.properties配置文件。
注意,如果直接編輯canal.properties,可能無(wú)法啟動(dòng),報(bào)如下錯(cuò)誤:
可以通過(guò)如下方式修改
mv canal.properties canal.properties_bak cp canal_local.properties canal.properties vim canal.properties
canal.properties文件全部?jī)?nèi)容如下:
# register ip canal.register.ip = # canal admin config canalAdmin 的鏈接、端口、用戶(hù)名和MD5密碼 canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB # admin auto register canal server啟動(dòng)后自動(dòng)注入到canal admin管理模塊 canal.admin.register.auto = true canal.admin.register.cluster = canal.admin.register.name =
一般只需要修改下面這3個(gè)
canal.admin.manager = 127.0.0.1:8089
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB
啟動(dòng)canal.server
進(jìn)入bin目錄,執(zhí)行如下命令,啟動(dòng)canal.server:
./startup.sh
查看canal日志
啟動(dòng)后,canalAdmin的server管理模塊,對(duì)應(yīng)創(chuàng)建的canal server會(huì)動(dòng)態(tài)識(shí)別到,狀態(tài)變?yōu)閱?dòng)
5、canal數(shù)據(jù)同步
5.1、java 端集成監(jiān)聽(tīng)canal 同步的mysql數(shù)據(jù)
1、引入依賴(lài)
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency>
2、編寫(xiě)測(cè)試代碼
package com.hy.das.config; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.util.List; @Component public class CanalClient implements InitializingBean{ private final static int BATCH_SIZE = 1000; @Override public void afterPropertiesSet() throws Exception { // 創(chuàng)建鏈接 此處的11111為tcp端口 在canal admin Server管理模塊可以查看 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "test", "", ""); try { //打開(kāi)連接 connector.connect(); //訂閱數(shù)據(jù)庫(kù)表,全部表 connector.subscribe(".*\\..*"); //回滾到未進(jìn)行ack的地方,下次fetch的時(shí)候,可以從最后一個(gè)沒(méi)有ack的地方開(kāi)始拿 connector.rollback(); while (true) { // 獲取指定數(shù)量的數(shù)據(jù) Message message = connector.getWithoutAck(BATCH_SIZE); System.out.println(message.getEntries().size()); //獲取批量ID long batchId = message.getId(); //獲取批量的數(shù)量 int size = message.getEntries().size(); //如果沒(méi)有數(shù)據(jù) if (batchId == -1 || size == 0) { try { //線程休眠2秒 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } else { System.out.println("----------------"); //如果有數(shù)據(jù),處理數(shù)據(jù) //遍歷entries,單條解析 for (CanalEntry.Entry entry : message.getEntries()) { //獲取表名 String tableName = entry.getHeader().getTableName(); //獲取類(lèi)型 CanalEntry.EntryType entryType = entry.getEntryType(); //獲取序列化后的數(shù)據(jù) ByteString storeValue = entry.getStoreValue(); //判斷entry類(lèi)型是否為ROWDATA類(lèi)型 if (CanalEntry.EntryType.ROWDATA.equals(entryType)){ //反序列化 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); //獲取當(dāng)前事件操作類(lèi)型 CanalEntry.EventType eventType = rowChange.getEventType(); //獲取數(shù)據(jù)集 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); //遍歷 for (CanalEntry.RowData rowData : rowDatasList) { //改變前數(shù)據(jù) JSONObject jsonObjectBefore = new JSONObject(); List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { jsonObjectBefore.put(column.getName(),column.getValue()); } //改變后數(shù)據(jù) JSONObject jsonObjectAfter = new JSONObject(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { jsonObjectAfter.put(column.getName(),column.getValue()); } System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter); } }else { System.out.println("當(dāng)前操作類(lèi)型為:"+entryType); } } } //進(jìn)行 batch id 的確認(rèn)。確認(rèn)之后,小于等于此 batchId 的 Message 都會(huì)被確認(rèn)。 connector.ack(batchId); } } catch (Exception e) { e.printStackTrace(); } finally { connector.disconnect(); } } }
newSingleConnector方法里面的test是一個(gè)instance實(shí)列,定義了需要同步的master庫(kù)的信息(ip、端口、用戶(hù)名、密碼、binlog文件名稱(chēng)、同步位置、需要同步的庫(kù)、不需要同步的庫(kù)等)
在canal admin web管理界面的Instance 管理模塊,點(diǎn)擊新建Instance進(jìn)行創(chuàng)建,新建頁(yè)面的Instance名稱(chēng)就是test,這個(gè)可以隨便填寫(xiě),代碼對(duì)應(yīng)修改就行,所屬集群/主機(jī),因?yàn)槲疫@里是單機(jī)部署,直接選擇自動(dòng)注入的canal server就行,點(diǎn)擊載入模板,獲取配置初始信息,下圖中標(biāo)出的信息按照實(shí)際的修改填入就行,點(diǎn)擊保存后,啟動(dòng)這個(gè)Instance。
3、啟動(dòng)服務(wù),對(duì)test庫(kù)的sys_user表進(jìn)行數(shù)據(jù)更新,可以看到后臺(tái)已經(jīng)收到變更數(shù)據(jù)
5.2、kafka同步數(shù)據(jù)
1:canal.properties配置文件增加如下配置
#數(shù)據(jù)變更發(fā)送到kafka # 設(shè)置輸出目標(biāo)為 kafka canal.serverMode = kafka # Kafka 地址 canal.mq.servers = xx.xx.xx.xx:9092 # 投遞失敗的重試次數(shù),默認(rèn)0,改為2 canal.mq.retries = 2 # Kafka batch.size,即producer一個(gè)微批次的大小,默認(rèn)16K,這里加倍 canal.mq.batchSize = 32768 # Kafka max.request.size,即一個(gè)請(qǐng)求的最大大小,默認(rèn)1M,這里也加倍 canal.mq.maxRequestSize = 2097152 # Kafka linger.ms,即sender線程在檢查微批次是否就緒時(shí)的超時(shí),默認(rèn)0ms,這里改為200ms # 滿(mǎn)足batch.size和linger.ms其中之一,就會(huì)發(fā)送消息 canal.mq.lingerMs = 200 # Kafka buffer.memory,緩存大小,默認(rèn)32M canal.mq.bufferMemory = 33554432 # 獲取binlog數(shù)據(jù)的批次大小,默認(rèn)50 canal.mq.canalBatchSize = 50 # 獲取binlog數(shù)據(jù)的超時(shí)時(shí)間,默認(rèn)200ms canal.mq.canalGetTimeout = 200 # 是否將binlog轉(zhuǎn)為JSON格式。如果為false,就是原生Protobuf格式 canal.mq.flatMessage = true # 壓縮類(lèi)型,官方文檔沒(méi)有描述 canal.mq.compressionType = none # Kafka acks,默認(rèn)all,表示分區(qū)leader會(huì)等所有follower同步完才給producer發(fā)送ack # 0表示不等待ack,1表示leader寫(xiě)入完畢之后直接ack canal.mq.acks = all # Kafka消息投遞是否使用事務(wù) # 主要針對(duì)flatMessage的異步發(fā)送和動(dòng)態(tài)多topic消息投遞進(jìn)行事務(wù)控制來(lái)保持和Canal binlog位置的一致性 # flatMessage模式下建議開(kāi)啟 canal.mq.transaction = true
2:在canal admin web界面修改instance mq配置,增加數(shù)據(jù)同步到kakfa的topic
3:如上兩步配置完成重啟后,在kafka監(jiān)聽(tīng)配置的topic就可以接收到數(shù)據(jù)了
6、java tcp同步只是其中一種方式,還可以通過(guò)kafka、rabbitmq等方式進(jìn)行數(shù)據(jù)同步
注意上面需要提供對(duì)外訪問(wèn)的端口需要開(kāi)通安全組,比如8089、11111等端口。
參考文章:
https://zhuanlan.zhihu.com/p/590705531
到此這篇關(guān)于canal實(shí)現(xiàn)mysql數(shù)據(jù)同步的文章就介紹到這了,更多相關(guān)canal mysql數(shù)據(jù)同步內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
mysql數(shù)據(jù)庫(kù)中字段的注釋和類(lèi)型長(zhǎng)度獲取方式
這篇文章主要介紹了mysql數(shù)據(jù)庫(kù)中字段的注釋和類(lèi)型長(zhǎng)度獲取方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01關(guān)于mysql查詢(xún)字符集不匹配問(wèn)題的解決方法
這篇文章主要給大家介紹了關(guān)于mysql查詢(xún)字符集不匹配問(wèn)題的解決方法,文中通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)同樣遇到這個(gè)問(wèn)題的朋友們具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面跟著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-08-08Mysql5.7忘記root密碼怎么辦(簡(jiǎn)單且有效方法)
本文給大家分享一個(gè)快速且簡(jiǎn)單的方法來(lái)解決Mysql5.7忘記root密碼問(wèn)題,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友參考下吧2017-02-02詳解如何利用amoeba(變形蟲(chóng))實(shí)現(xiàn)mysql數(shù)據(jù)庫(kù)讀寫(xiě)分離
這篇文章主要介紹了詳解如何利用amoeba(變形蟲(chóng))實(shí)現(xiàn)mysql數(shù)據(jù)庫(kù)讀寫(xiě)分離,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-05-05MySQL分頁(yè)技術(shù)、6種分頁(yè)方法總結(jié)
這篇文章主要介紹了MySQL分頁(yè)技術(shù)、6種分頁(yè)方法總結(jié),本文總結(jié)了6種分頁(yè)的方法并分別一一講解它們的特點(diǎn),需要的朋友可以參考下2015-07-07最新版MySQL 8.0.22下載安裝超詳細(xì)教程(Windows 64位)
這篇文章主要介紹了最新版MySQL 8.0.22下載安裝超詳細(xì)教程(Windows 64位),本文通過(guò)圖文實(shí)例相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-12-12