MySQL數(shù)據(jù)變化監(jiān)聽(tīng)的實(shí)現(xiàn)方案
1. binlog 簡(jiǎn)介
1.1 什么是 binlog?
binlog(Binary Log) 是 MySQL 記錄 DDL(數(shù)據(jù)定義語(yǔ)言,如 CREATE
、ALTER
)和 DML(數(shù)據(jù)操作語(yǔ)言,如 INSERT
、UPDATE
、DELETE
)的日志文件,它用于:
- 主從復(fù)制:MySQL 主庫(kù)將 binlog 傳輸?shù)綇膸?kù),實(shí)現(xiàn)數(shù)據(jù)同步。
- 數(shù)據(jù)恢復(fù):通過(guò)
mysqlbinlog
工具解析 binlog 恢復(fù)數(shù)據(jù)。 - 數(shù)據(jù)同步:第三方工具(如 Canal)解析 binlog,進(jìn)行數(shù)據(jù)同步。
1.2 binlog 的三種格式
binlog 格式 | 說(shuō)明 |
---|---|
STATEMENT | 記錄 SQL 語(yǔ)句本身 |
ROW | 記錄行數(shù)據(jù)變更(推薦) |
MIXED | 結(jié)合前兩者,MySQL 自動(dòng)判斷 |
由于 ROW 格式能提供精確的行級(jí)別變更信息,因此推薦使用它。
2. 開(kāi)啟 binlog 并配置 MySQL
2.1 檢查 binlog 是否開(kāi)啟
SHOW VARIABLES LIKE 'log_bin';
如果 log_bin
值為 OFF
,說(shuō)明 binlog 未開(kāi)啟。
2.2 修改 MySQL 配置文件(my.cnf 或 my.ini)
在 [mysqld]
部分添加以下內(nèi)容:
server-id=1 log-bin=mysql-bin binlog-format=ROW binlog-row-image=FULL expire_logs_days=7
重啟 MySQL:
systemctl restart mysql # Linux net stop mysql && net start mysql # Windows
2.3 驗(yàn)證 binlog 配置
執(zhí)行:
SHOW BINARY LOGS;
如果有 binlog 文件,如 mysql-bin.000001
,說(shuō)明已開(kāi)啟。
3. 使用 Java 監(jiān)聽(tīng) binlog
3.1 選擇工具:Canal
阿里巴巴開(kāi)源的 Canal 可以模擬 MySQL 從庫(kù)協(xié)議,解析 binlog 并實(shí)時(shí)推送增量數(shù)據(jù)。
3.2 Java 代碼監(jiān)聽(tīng) binlog
引入 Maven 依賴
<dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.6</version> </dependency> </dependencies>
編寫(xiě) Java 代碼
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class BinlogListener { public static void main(String[] args) { // 連接 Canal CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal"); try { connector.connect(); connector.subscribe(".*\\..*"); // 監(jiān)聽(tīng)所有庫(kù)表 connector.rollback(); while (true) { Message message = connector.getWithoutAck(100); // 獲取數(shù)據(jù) long batchId = message.getId(); List<CanalEntry.Entry> entries = message.getEntries(); if (batchId != -1 && !entries.isEmpty()) { for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) { processEntry(entry); } } } connector.ack(batchId); // 確認(rèn)消息 } } finally { connector.disconnect(); } } private static void processEntry(CanalEntry.Entry entry) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); CanalEntry.EventType eventType = rowChange.getEventType(); System.out.println("變更表:" + entry.getHeader().getTableName()); System.out.println("變更類(lèi)型:" + eventType); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { System.out.println("刪除數(shù)據(jù):" + rowData.getBeforeColumnsList()); } else if (eventType == CanalEntry.EventType.INSERT) { System.out.println("新增數(shù)據(jù):" + rowData.getAfterColumnsList()); } else { System.out.println("更新前數(shù)據(jù):" + rowData.getBeforeColumnsList()); System.out.println("更新后數(shù)據(jù):" + rowData.getAfterColumnsList()); } } } catch (Exception e) { e.printStackTrace(); } } }
4. 代碼解析
創(chuàng)建 Canal 連接
CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal");
127.0.0.1
:Canal 服務(wù)器地址11111
:Canal 端口example
:Canal 實(shí)例canal/canal
:默認(rèn)賬號(hào)密碼
獲取 binlog 變更數(shù)據(jù)
Message message = connector.getWithoutAck(100);
getWithoutAck(100)
:拉取 100 條 binlog 事件。
解析 binlog
for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) { processEntry(entry); } }
僅處理
ROWDATA
類(lèi)型的變更,忽略事務(wù)等其他信息。分類(lèi)處理
INSERT
、UPDATE
、DELETE
if (eventType == CanalEntry.EventType.DELETE) { System.out.println("刪除數(shù)據(jù):" + rowData.getBeforeColumnsList()); } else if (eventType == CanalEntry.EventType.INSERT) { System.out.println("新增數(shù)據(jù):" + rowData.getAfterColumnsList()); } else { System.out.println("更新前數(shù)據(jù):" + rowData.getBeforeColumnsList()); System.out.println("更新后數(shù)據(jù):" + rowData.getAfterColumnsList()); }
總結(jié)
- MySQL binlog 記錄數(shù)據(jù)庫(kù)變更,可用于監(jiān)聽(tīng)增量數(shù)據(jù)。
- Canal 作為 MySQL 從庫(kù)解析 binlog,實(shí)現(xiàn)數(shù)據(jù)同步。
- Java 代碼示例 展示如何用 Canal 監(jiān)聽(tīng)
INSERT
、UPDATE
、DELETE
操作,并解析變更數(shù)據(jù)。
這種方案適用于 分布式數(shù)據(jù)同步、緩存一致性 和 數(shù)據(jù)變更通知,是實(shí)時(shí)數(shù)據(jù)處理的重要手段。
以上就是MySQL數(shù)據(jù)變化監(jiān)聽(tīng)的實(shí)現(xiàn)方案的詳細(xì)內(nèi)容,更多關(guān)于MySQL數(shù)據(jù)變化監(jiān)聽(tīng)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
MySQL中int(10)和int(11)的區(qū)別詳解
本文主要介紹了MySQL中int(10)和int(11)的區(qū)別詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03MySQL中UPDATE與DELETE語(yǔ)句的使用教程
這篇文章主要介紹了MySQL中UPDATE與DELETE語(yǔ)句的使用教程,是MySQL入門(mén)學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下2015-12-12提升MYSQL查詢效率的10個(gè)SQL語(yǔ)句優(yōu)化技巧
MySQL數(shù)據(jù)庫(kù)執(zhí)行效率對(duì)程序的執(zhí)行速度有很大的影響,有效的處理優(yōu)化數(shù)據(jù)庫(kù)是非常有用的。尤其是大量數(shù)據(jù)需要處理的時(shí)候2018-03-03MySQL下200GB大表備份的操作(利用傳輸表空間解決停服發(fā)版表備份問(wèn)題)
這篇文章主要介紹了MySQL下200GB大表備份的操作(利用傳輸表空間解決停服發(fā)版表備份問(wèn)題),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2025-04-04MySQL Order by 語(yǔ)句用法與優(yōu)化詳解
Order by語(yǔ)句是用來(lái)排序的,經(jīng)常我們會(huì)使用到Order by來(lái)進(jìn)行排序,下面我給大家來(lái)講講Order by用法與優(yōu)化排序,有需要的同學(xué)可參考2013-06-06MySQL Json類(lèi)型字段IN查詢分組優(yōu)化
這篇文章主要為大家介紹了MySQL Json類(lèi)型字段IN查詢分組優(yōu)化,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08