SpringBoot整合Canal+RabbitMQ監(jiān)聽數(shù)據(jù)變更詳解
需求
在現(xiàn)代分布式系統(tǒng)中,實(shí)時(shí)獲取數(shù)據(jù)庫的變更信息是一個(gè)常見的需求。例如,在電商系統(tǒng)中,當(dāng)訂單表發(fā)生更新時(shí),可能需要同步這些變更到搜索服務(wù)、緩存服務(wù)或者通知其他微服務(wù)。傳統(tǒng)的解決方案包括定時(shí)輪詢數(shù)據(jù)庫或通過觸發(fā)器將變更寫入消息隊(duì)列等方法,但這些方案要么效率低下,要么實(shí)現(xiàn)復(fù)雜。而使用 Canal + RabbitMQ 可以提供一種高效且可靠的方式來捕獲 MySQL 數(shù)據(jù)庫的變更,并將其發(fā)送到 RabbitMQ 中供其他服務(wù)消費(fèi)。
Canal 是阿里巴巴開源的一個(gè)用于增量訂閱和消費(fèi) MySQL 數(shù)據(jù)庫 Binlog 的工具,它模擬 MySQL 主從復(fù)制機(jī)制,無需侵入業(yè)務(wù)邏輯即可捕獲數(shù)據(jù)庫變更。RabbitMQ 是一個(gè)流行的開源消息代理,支持多種協(xié)議并提供了豐富的特性來確保消息傳遞的可靠性。結(jié)合這兩者,可以構(gòu)建一個(gè)強(qiáng)大的實(shí)時(shí)數(shù)據(jù)變更監(jiān)聽和處理系統(tǒng)。
步驟
環(huán)境搭建
整合SpringBoot與Canal實(shí)現(xiàn)客戶端
Canal整合RabbitMQ
SpringBoot整合RabbitMQ
環(huán)境搭建
1. 安裝MySQL
確保你有一個(gè)正在運(yùn)行的 MySQL 實(shí)例,并啟用了 binlog 日志記錄功能。這是 Canal 捕獲數(shù)據(jù)庫變更的基礎(chǔ)。
# 修改 MySQL 配置文件 my.cnf 或 my.ini [mysqld] server-id=1 log-bin=mysql-bin binlog-format=ROW
重啟 MySQL 服務(wù)使配置生效。
2. 安裝Canal Server
下載最新版本的 Canal Server 并解壓到合適的位置。根據(jù)官方文檔進(jìn)行必要的配置,特別是 instance.properties 文件中的數(shù)據(jù)庫連接信息。
3. 安裝RabbitMQ
可以通過 Docker 快速安裝 RabbitMQ:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
訪問 http://localhost:15672 登錄管理界面,默認(rèn)用戶名/密碼為 guest/guest。
整合SpringBoot與Canal實(shí)現(xiàn)客戶端
創(chuàng)建SpringBoot項(xiàng)目
使用 Spring Initializr 創(chuàng)建一個(gè)新的 Spring Boot 項(xiàng)目,添加 Web, JPA, 和 AMQP(用于后續(xù)整合 RabbitMQ)依賴。
引入Canal依賴
在 pom.xml 中添加 Canal Client 的依賴:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
編寫Canal客戶端代碼
創(chuàng)建一個(gè) Canal 客戶端類,用來監(jiān)聽 MySQL 數(shù)據(jù)庫的變化,并將變更事件轉(zhuǎn)發(fā)給 RabbitMQ。
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.rabbitmq.client.Channel;
public class CanalClient {
private final CanalConnector connector;
private final Channel channel;
public CanalClient(CanalConnector connector, Channel channel) {
this.connector = connector;
this.channel = channel;
}
public void start() {
// Canal 連接配置
connector.connect();
connector.subscribe(".*\\..*"); // 訂閱所有數(shù)據(jù)庫和表
connector.rollback();
while (true) {
int batchSize = 1000;
EntryBatch batch = connector.getWithoutAck(batchSize); // 獲取一批次數(shù)據(jù)
long batchId = batch.getId();
int size = batch.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
printEntry(batch.getEntries());
connector.ack(batchId); // 提交確認(rèn)
}
if (Thread.currentThread().isInterrupted()) {
break;
}
}
connector.disconnect();
}
private void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
sendToRabbitMQ(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
sendToRabbitMQ(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
sendToRabbitMQ(rowData.getBeforeColumnsList());
System.out.println("-------> after");
sendToRabbitMQ(rowData.getAfterColumnsList());
}
}
}
}
private void sendToRabbitMQ(List<Column> columns) {
StringBuilder message = new StringBuilder();
for (Column column : columns) {
message.append(column.getName()).append("=").append(column.getValue()).append(",");
}
try {
channel.basicPublish("", "canal_exchange", null, message.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
}
Canal整合RabbitMQ
配置Canal Server
確保 Canal Server 已正確配置并啟動(dòng),能夠監(jiān)聽 MySQL 的 binlog 日志。修改 Canal Server 的配置文件以指向你的 MySQL 實(shí)例,并設(shè)置適當(dāng)?shù)倪^濾規(guī)則。
配置RabbitMQ Exchange
在 RabbitMQ 中創(chuàng)建一個(gè)名為 canal_exchange 的 exchange,類型可以根據(jù)需要選擇,如 fanout, direct, topic 或 headers。
rabbitmqadmin declare exchange name=canal_exchange type=fanout
SpringBoot整合RabbitMQ
添加依賴
確保在 pom.xml 中已經(jīng)包含了 RabbitMQ 的 Spring AMQP 依賴。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置RabbitMQ連接信息
在 application.yml 或 application.properties 中配置 RabbitMQ 的連接參數(shù)。
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
創(chuàng)建消費(fèi)者
編寫一個(gè)消費(fèi)者類來接收來自 RabbitMQ 的消息。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class CanalMessageConsumer {
@RabbitListener(queues = "canal_queue")
public void receive(String message) {
System.out.println("Received message: " + message);
}
}
配置隊(duì)列和綁定
確保在應(yīng)用程序啟動(dòng)時(shí)自動(dòng)創(chuàng)建所需的隊(duì)列,并將它們綁定到之前創(chuàng)建的 exchange 上。
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public Queue canalQueue() {
return new Queue("canal_queue", false);
}
@Bean
public TopicExchange canalExchange() {
return new TopicExchange("canal_exchange");
}
@Bean
public Binding binding(Queue canalQueue, TopicExchange canalExchange) {
return BindingBuilder.bind(canalQueue).to(canalExchange).with("#");
}
}
總結(jié)
通過以上步驟,我們成功地將 Canal 與 RabbitMQ 整合到了 Spring Boot 應(yīng)用程序中。這使得我們可以實(shí)時(shí)監(jiān)聽 MySQL 數(shù)據(jù)庫的變更,并將這些變更作為消息發(fā)布到 RabbitMQ 中供其他微服務(wù)消費(fèi)。這種方法不僅提高了系統(tǒng)的響應(yīng)速度,也簡(jiǎn)化了數(shù)據(jù)同步的過程,降低了開發(fā)和維護(hù)成本。
到此這篇關(guān)于SpringBoot整合Canal+RabbitMQ監(jiān)聽數(shù)據(jù)變更詳解的文章就介紹到這了,更多相關(guān)SpringBoot監(jiān)聽數(shù)據(jù)變更內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Springboot使用Rabbitmq的延時(shí)隊(duì)列+死信隊(duì)列實(shí)現(xiàn)消息延期消費(fèi)
- Springboot使用RabbitMQ實(shí)現(xiàn)關(guān)閉超時(shí)訂單(示例詳解)
- SpringBoot整合RabbitMQ實(shí)現(xiàn)流量消峰
- SpringBoot集成和使用RabbitMQ方式
- springboot整合RabbitMQ中死信隊(duì)列的實(shí)現(xiàn)
- SpringBoot 整合 RabbitMQ 的使用方式(代碼示例)
- springboot整合rabbitmq實(shí)現(xiàn)訂單超時(shí)取消案例分析
相關(guān)文章
springboot2.6.4集成swagger3.0遇到的坑及解決方法
這篇文章主要介紹了springboot2.6.4如何集成swagger3.0,在集成的過程中遇到很多問題,本文給大家分享四種問題及相應(yīng)的解決方案,需要的朋友可以參考下2022-03-03
Spring的定時(shí)任務(wù)@Scheduled源碼詳解
這篇文章主要介紹了Spring的定時(shí)任務(wù)@Scheduled源碼詳解,@Scheduled注解是包org.springframework.scheduling.annotation中的一個(gè)注解,主要是用來開啟定時(shí)任務(wù),本文提供了部分實(shí)現(xiàn)代碼與思路,需要的朋友可以參考下2023-09-09
Spring JdbcTemplate執(zhí)行數(shù)據(jù)庫操作詳解
JdbcTemplate是Spring框架自帶的對(duì)JDBC操作的封裝,目的是提供統(tǒng)一的模板方法使對(duì)數(shù)據(jù)庫的操作更加方便、友好,效率也不錯(cuò),這篇文章主要介紹了Spring JdbcTemplate執(zhí)行數(shù)據(jù)庫操作,需要的朋友可以參考下2022-10-10
SpringBoot請(qǐng)求轉(zhuǎn)發(fā)的方式小結(jié)
本文主要介紹了SpringBoot請(qǐng)求轉(zhuǎn)發(fā)的方式,一共有兩大類,一種是controller控制器轉(zhuǎn)發(fā)一種是使用HttpServletRequest進(jìn)行轉(zhuǎn)發(fā),本文就詳細(xì)的介紹一下,感興趣的可以了解一下2023-09-09
IntelliJ IDEA中查看文件內(nèi)所有已聲明的方法(類似eclipse的outline)
今天小編就為大家分享一篇關(guān)于IntelliJ IDEA中查看文件內(nèi)所有已聲明的方法(類似eclipse的outline),小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2018-10-10

