亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

canal實(shí)現(xiàn)mysql數(shù)據(jù)同步的詳細(xì)過(guò)程

 更新時(shí)間:2025年06月12日 15:14:30   作者:那些樂(lè)趣  
這篇文章主要介紹了canal實(shí)現(xiàn)mysql數(shù)據(jù)同步的詳細(xì)過(guò)程,本文通過(guò)實(shí)例圖文相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧

1、canal下載

canal實(shí)現(xiàn)mysql數(shù)據(jù)同步可以直接安裝canal server就可以了,但是為了方便管理(instance配置,canal server狀態(tài)管理,集群等),需要安裝canal admin,應(yīng)用下載地址:Releases · alibaba/canal · GitHub

進(jìn)入頁(yè)面可以選擇需要安裝的版本

下載canal.deployer-1.1.8.tar.gz和canal.admin-1.1.8.tar.gz

2、mysql同步用戶(hù)創(chuàng)建和授權(quán)

登錄mysql
mysql -h 127.0.0.1 -P 3306 -u root -p
創(chuàng)建同步用戶(hù) repl 密碼設(shè)為123456
CREATE USER 'repl'@'%' IDENTIFIED BY '123456';
給予同步權(quán)限
GRANT REPLICATION SLAVE ON *.* to 'repl'@'%' identified by '123456';
給予repl只讀test庫(kù)的權(quán)限,test庫(kù)是用來(lái)同步數(shù)據(jù)的
GRANT SELECT ON test.* to 'repl'@'%' identified by '123456';
canal_manager是canal admin需要的,給予repl對(duì)該庫(kù)的讀寫(xiě)權(quán)限
GRANT ALL PRIVILEGES ON canal_manager.* to 'repl'@'%' identified by '123456';
mysql my.cnf配置文件增加主從配置master數(shù)據(jù)庫(kù)的配置信息
#主數(shù)據(jù)主從配置 唯一id
server_id=1
#開(kāi)啟logbin
log-bin=mysql-bin
#寫(xiě)入模式 row
binlog-format=ROW
#需要同步的庫(kù)
binlog-do-db=test
#忽略的數(shù)據(jù)庫(kù)
replicate-ignore-db=mysql
replicate-ignore-db=sys
replicate-ignore-db=information_schema
replicate-ignore-db=performance_schema

在canal-admin解壓文件的conf中有一個(gè)canal_manager.sql,導(dǎo)入到master數(shù)據(jù)庫(kù)

3、canal admin安裝和啟動(dòng)

把canal.admin-1.1.8.tar.gz上傳到linux

解壓 tar -zvxf canal.admin-1.1.8.tar.gz 

進(jìn)入conf目錄下,編輯application.yml配置文件。

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: repl
  password: 123456
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1
canal:
  adminUser: admin
  adminPasswd: 123456

重點(diǎn)介紹以下幾個(gè)參數(shù):

address:我們需要訂閱(也就是mysql master服務(wù)器)mysql所在的服務(wù)器IP和數(shù)據(jù)庫(kù)端口。

database:canal.admin web系統(tǒng)必須的幾張表,需要在mysql master服務(wù)器上初始化conf/canal_manager.sql文件。

sername和password就是mysql master服務(wù)器創(chuàng)建的用于復(fù)制的用戶(hù)和密碼,也就是我們?cè)赾anal server中配置的repl 和 123456。

driver-class-name:mysql的驅(qū)動(dòng),默認(rèn)是MYSQL5的驅(qū)動(dòng),如果你的MYSQL是8的(我的就是),要將驅(qū)動(dòng)改為com.mysql.cj.jdbc.Driver。

另外,還需要在mysql連接后面加上allowPublicKeyRetrieval=true,不然啟動(dòng)時(shí),有可能報(bào)錯(cuò)。

啟動(dòng)canal.admin

進(jìn)入bin目錄,執(zhí)行如下命令,啟動(dòng)canal.admin:

./startup.sh

查看 admin 日志

2022-12-10 03:13:58.995 [main] INFO  o.s.jmx.export.annotation.AnnotationMBeanExporter - 
Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2022-12-10 03:13:59.015 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
2022-12-10 03:13:59.038 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2022-12-10 03:13:59.214 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
2022-12-10 03:13:59.221 [main] INFO  com.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 14.281 seconds (JVM running for 15.894)

如果出現(xiàn)上述日志,說(shuō)明啟動(dòng)成功!

登錄admin

通過(guò)http://127.0.0.1:8089/訪問(wèn),默認(rèn)密碼:admin/123456。

注意,IP和密碼需要改成你自己配置的。如果是在服務(wù)器上配置的,別忘記放開(kāi)8089端口。

輸入用戶(hù)名和密碼之后,出現(xiàn)上述頁(yè)面說(shuō)明配置成功!

如果需要修改密碼,直接通過(guò)執(zhí)行 select upper(sha1(unhex(sha1('1234567')))) 這個(gè)sql得到結(jié)果,然后復(fù)制到canal_manager庫(kù)的canal_user表的password字段中就可以了,其中1234567是明文密碼,執(zhí)行上述sql會(huì)得到一個(gè)密碼。

4、canal server安裝和啟動(dòng)

把canal.deployer-1.1.8.tar.gz上傳到linux

解壓 tar -zvxf ccanal.deployer-1.1.8.tar.gz

進(jìn)入conf目錄下,編輯canal.properties配置文件。

注意,如果直接編輯canal.properties,可能無(wú)法啟動(dòng),報(bào)如下錯(cuò)誤:

可以通過(guò)如下方式修改

mv canal.properties canal.properties_bak
cp canal_local.properties canal.properties
vim canal.properties

canal.properties文件全部?jī)?nèi)容如下:

# register ip
canal.register.ip =
# canal admin config  canalAdmin 的鏈接、端口、用戶(hù)名和MD5密碼
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB
# admin auto register canal server啟動(dòng)后自動(dòng)注入到canal admin管理模塊
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name =

一般只需要修改下面這3個(gè)

canal.admin.manager = 127.0.0.1:8089
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB

啟動(dòng)canal.server

進(jìn)入bin目錄,執(zhí)行如下命令,啟動(dòng)canal.server:

./startup.sh

查看canal日志

啟動(dòng)后,canalAdmin的server管理模塊,對(duì)應(yīng)創(chuàng)建的canal server會(huì)動(dòng)態(tài)識(shí)別到,狀態(tài)變?yōu)閱?dòng)

5、canal數(shù)據(jù)同步

5.1、java 端集成監(jiān)聽(tīng)canal 同步的mysql數(shù)據(jù)

1、引入依賴(lài)

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

2、編寫(xiě)測(cè)試代碼

package com.hy.das.config;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalClient implements InitializingBean{
    private final static int BATCH_SIZE = 1000;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 創(chuàng)建鏈接 此處的11111為tcp端口 在canal admin Server管理模塊可以查看
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
                "test", "", "");
        try {
            //打開(kāi)連接
            connector.connect();
            //訂閱數(shù)據(jù)庫(kù)表,全部表
            connector.subscribe(".*\\..*");
            //回滾到未進(jìn)行ack的地方,下次fetch的時(shí)候,可以從最后一個(gè)沒(méi)有ack的地方開(kāi)始拿
            connector.rollback();
            while (true) {
                // 獲取指定數(shù)量的數(shù)據(jù)
                Message message = connector.getWithoutAck(BATCH_SIZE);
                System.out.println(message.getEntries().size());
                //獲取批量ID
                long batchId = message.getId();
                //獲取批量的數(shù)量
                int size = message.getEntries().size();
                //如果沒(méi)有數(shù)據(jù)
                if (batchId == -1 || size == 0) {
                    try {
                        //線程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    System.out.println("----------------");
                    //如果有數(shù)據(jù),處理數(shù)據(jù)
                    //遍歷entries,單條解析
                    for (CanalEntry.Entry entry : message.getEntries()) {
                        //獲取表名
                        String tableName = entry.getHeader().getTableName();
                        //獲取類(lèi)型
                        CanalEntry.EntryType entryType = entry.getEntryType();
                        //獲取序列化后的數(shù)據(jù)
                        ByteString storeValue = entry.getStoreValue();
                        //判斷entry類(lèi)型是否為ROWDATA類(lèi)型
                        if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
                            //反序列化
                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                            //獲取當(dāng)前事件操作類(lèi)型
                            CanalEntry.EventType eventType = rowChange.getEventType();
                            //獲取數(shù)據(jù)集
                            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                            //遍歷
                            for (CanalEntry.RowData rowData : rowDatasList) {
                                //改變前數(shù)據(jù)
                                JSONObject jsonObjectBefore = new JSONObject();
                                List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    jsonObjectBefore.put(column.getName(),column.getValue());
                                }
                                //改變后數(shù)據(jù)
                                JSONObject jsonObjectAfter = new JSONObject();
                                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                                for (CanalEntry.Column column : afterColumnsList) {
                                    jsonObjectAfter.put(column.getName(),column.getValue());
                                }
                                System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter);
                            }
                        }else {
                            System.out.println("當(dāng)前操作類(lèi)型為:"+entryType);
                        }
                    }
                }
                //進(jìn)行 batch id 的確認(rèn)。確認(rèn)之后,小于等于此 batchId 的 Message 都會(huì)被確認(rèn)。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }
}

newSingleConnector方法里面的test是一個(gè)instance實(shí)列,定義了需要同步的master庫(kù)的信息(ip、端口、用戶(hù)名、密碼、binlog文件名稱(chēng)、同步位置、需要同步的庫(kù)、不需要同步的庫(kù)等)

在canal admin web管理界面的Instance 管理模塊,點(diǎn)擊新建Instance進(jìn)行創(chuàng)建,新建頁(yè)面的Instance名稱(chēng)就是test,這個(gè)可以隨便填寫(xiě),代碼對(duì)應(yīng)修改就行,所屬集群/主機(jī),因?yàn)槲疫@里是單機(jī)部署,直接選擇自動(dòng)注入的canal server就行,點(diǎn)擊載入模板,獲取配置初始信息,下圖中標(biāo)出的信息按照實(shí)際的修改填入就行,點(diǎn)擊保存后,啟動(dòng)這個(gè)Instance。

3、啟動(dòng)服務(wù),對(duì)test庫(kù)的sys_user表進(jìn)行數(shù)據(jù)更新,可以看到后臺(tái)已經(jīng)收到變更數(shù)據(jù)

5.2、kafka同步數(shù)據(jù)

1:canal.properties配置文件增加如下配置

#數(shù)據(jù)變更發(fā)送到kafka
# 設(shè)置輸出目標(biāo)為 kafka
canal.serverMode = kafka
# Kafka 地址
canal.mq.servers  = xx.xx.xx.xx:9092
# 投遞失敗的重試次數(shù),默認(rèn)0,改為2
canal.mq.retries = 2
# Kafka batch.size,即producer一個(gè)微批次的大小,默認(rèn)16K,這里加倍
canal.mq.batchSize = 32768
# Kafka max.request.size,即一個(gè)請(qǐng)求的最大大小,默認(rèn)1M,這里也加倍
canal.mq.maxRequestSize = 2097152
# Kafka linger.ms,即sender線程在檢查微批次是否就緒時(shí)的超時(shí),默認(rèn)0ms,這里改為200ms
# 滿(mǎn)足batch.size和linger.ms其中之一,就會(huì)發(fā)送消息
canal.mq.lingerMs = 200
# Kafka buffer.memory,緩存大小,默認(rèn)32M
canal.mq.bufferMemory = 33554432
# 獲取binlog數(shù)據(jù)的批次大小,默認(rèn)50
canal.mq.canalBatchSize = 50
# 獲取binlog數(shù)據(jù)的超時(shí)時(shí)間,默認(rèn)200ms
canal.mq.canalGetTimeout = 200
# 是否將binlog轉(zhuǎn)為JSON格式。如果為false,就是原生Protobuf格式
canal.mq.flatMessage = true
# 壓縮類(lèi)型,官方文檔沒(méi)有描述
canal.mq.compressionType = none
# Kafka acks,默認(rèn)all,表示分區(qū)leader會(huì)等所有follower同步完才給producer發(fā)送ack
# 0表示不等待ack,1表示leader寫(xiě)入完畢之后直接ack
canal.mq.acks = all
# Kafka消息投遞是否使用事務(wù)
# 主要針對(duì)flatMessage的異步發(fā)送和動(dòng)態(tài)多topic消息投遞進(jìn)行事務(wù)控制來(lái)保持和Canal binlog位置的一致性
# flatMessage模式下建議開(kāi)啟
canal.mq.transaction = true

2:在canal admin web界面修改instance mq配置,增加數(shù)據(jù)同步到kakfa的topic

3:如上兩步配置完成重啟后,在kafka監(jiān)聽(tīng)配置的topic就可以接收到數(shù)據(jù)了

6、java tcp同步只是其中一種方式,還可以通過(guò)kafka、rabbitmq等方式進(jìn)行數(shù)據(jù)同步

注意上面需要提供對(duì)外訪問(wèn)的端口需要開(kāi)通安全組,比如8089、11111等端口。

參考文章:

【CanalAdmin部署文檔】

https://zhuanlan.zhihu.com/p/590705531

到此這篇關(guān)于canal實(shí)現(xiàn)mysql數(shù)據(jù)同步的文章就介紹到這了,更多相關(guān)canal mysql數(shù)據(jù)同步內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • MySql5.7.18字符集配置圖文詳解

    MySql5.7.18字符集配置圖文詳解

    本文通過(guò)圖文并茂的形式給大家介紹了mysql5.7.18字符集配置教程,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友參考下吧
    2017-06-06
  • Mysql查看死鎖與解除死鎖的深入講解

    Mysql查看死鎖與解除死鎖的深入講解

    這篇文章主要給大家介紹了關(guān)于Mysql查看死鎖與解除死鎖的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-12-12
  • mysql數(shù)據(jù)庫(kù)中字段的注釋和類(lèi)型長(zhǎng)度獲取方式

    mysql數(shù)據(jù)庫(kù)中字段的注釋和類(lèi)型長(zhǎng)度獲取方式

    這篇文章主要介紹了mysql數(shù)據(jù)庫(kù)中字段的注釋和類(lèi)型長(zhǎng)度獲取方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-01-01
  • 關(guān)于mysql查詢(xún)字符集不匹配問(wèn)題的解決方法

    關(guān)于mysql查詢(xún)字符集不匹配問(wèn)題的解決方法

    這篇文章主要給大家介紹了關(guān)于mysql查詢(xún)字符集不匹配問(wèn)題的解決方法,文中通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)同樣遇到這個(gè)問(wèn)題的朋友們具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面跟著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。
    2017-08-08
  • Mysql5.7忘記root密碼怎么辦(簡(jiǎn)單且有效方法)

    Mysql5.7忘記root密碼怎么辦(簡(jiǎn)單且有效方法)

    本文給大家分享一個(gè)快速且簡(jiǎn)單的方法來(lái)解決Mysql5.7忘記root密碼問(wèn)題,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友參考下吧
    2017-02-02
  • 詳解如何利用amoeba(變形蟲(chóng))實(shí)現(xiàn)mysql數(shù)據(jù)庫(kù)讀寫(xiě)分離

    詳解如何利用amoeba(變形蟲(chóng))實(shí)現(xiàn)mysql數(shù)據(jù)庫(kù)讀寫(xiě)分離

    這篇文章主要介紹了詳解如何利用amoeba(變形蟲(chóng))實(shí)現(xiàn)mysql數(shù)據(jù)庫(kù)讀寫(xiě)分離,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-05-05
  • 基于MySQL Master Slave同步配置的操作詳解

    基于MySQL Master Slave同步配置的操作詳解

    本篇文章是對(duì)MySQL Master Slave 同步配置進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下
    2013-06-06
  • MySQL分區(qū)表的正確使用方法

    MySQL分區(qū)表的正確使用方法

    這篇文章主要給大家介紹了關(guān)于MySQL分區(qū)表的正確使用方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-01-01
  • MySQL分頁(yè)技術(shù)、6種分頁(yè)方法總結(jié)

    MySQL分頁(yè)技術(shù)、6種分頁(yè)方法總結(jié)

    這篇文章主要介紹了MySQL分頁(yè)技術(shù)、6種分頁(yè)方法總結(jié),本文總結(jié)了6種分頁(yè)的方法并分別一一講解它們的特點(diǎn),需要的朋友可以參考下
    2015-07-07
  • 最新版MySQL 8.0.22下載安裝超詳細(xì)教程(Windows 64位)

    最新版MySQL 8.0.22下載安裝超詳細(xì)教程(Windows 64位)

    這篇文章主要介紹了最新版MySQL 8.0.22下載安裝超詳細(xì)教程(Windows 64位),本文通過(guò)圖文實(shí)例相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-12-12

最新評(píng)論