如何利用Java實現(xiàn)MySQL的數(shù)據(jù)變化監(jiān)聽
在高并發(fā)和大數(shù)據(jù)環(huán)境下,實時獲取 MySQL 數(shù)據(jù)庫的增量變化對數(shù)據(jù)同步、數(shù)據(jù)分析、緩存更新等場景至關(guān)重要。MySQL 的 binlog(Binary Log) 記錄了數(shù)據(jù)庫的所有變更,可以用來實現(xiàn) 增量數(shù)據(jù)監(jiān)聽。本文將介紹如何利用 binlog 監(jiān)聽 MySQL 數(shù)據(jù)增量,并提供基于 Java 的 Canal 實現(xiàn)示例。
1.binlog 簡介
1.1 什么是 binlog
binlog(Binary Log) 是 MySQL 記錄 DDL(數(shù)據(jù)定義語言,如 CREATE、ALTER)和 DML(數(shù)據(jù)操作語言,如 INSERT、UPDATE、DELETE)的日志文件,它用于:
- 主從復(fù)制:MySQL 主庫將 binlog 傳輸?shù)綇膸欤瑢崿F(xiàn)數(shù)據(jù)同步。
- 數(shù)據(jù)恢復(fù):通過
mysqlbinlog工具解析 binlog 恢復(fù)數(shù)據(jù)。 - 數(shù)據(jù)同步:第三方工具(如 Canal)解析 binlog,進(jìn)行數(shù)據(jù)同步。
1.2 binlog 的三種格式
| binlog 格式 | 說明 |
|---|---|
| STATEMENT | 記錄 SQL 語句本身 |
| ROW | 記錄行數(shù)據(jù)變更(推薦) |
| MIXED | 結(jié)合前兩者,MySQL 自動判斷 |
由于 ROW 格式能提供精確的行級別變更信息,因此推薦使用它。
2. 開啟 binlog 并配置 MySQL
2.1 檢查 binlog 是否開啟
SHOW VARIABLES LIKE 'log_bin';
如果 log_bin 值為 OFF,說明 binlog 未開啟。
2.2 修改 MySQL 配置文件(my.cnf 或 my.ini)
在 [mysqld] 部分添加以下內(nèi)容:
server-id=1 log-bin=mysql-bin binlog-format=ROW binlog-row-image=FULL expire_logs_days=7
重啟 MySQL:
systemctl restart mysql # Linux net stop mysql && net start mysql # Windows
2.3 驗證 binlog 配置
執(zhí)行:
SHOW BINARY LOGS;
如果有 binlog 文件,如 mysql-bin.000001,說明已開啟。
3. 使用 Java 監(jiān)聽 binlog
3.1 選擇工具:Canal
阿里巴巴開源的 Canal 可以模擬 MySQL 從庫協(xié)議,解析 binlog 并實時推送增量數(shù)據(jù)。
3.2 Java 代碼監(jiān)聽 binlog
引入 Maven 依賴
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version>
</dependency>
</dependencies>編寫 Java 代碼
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class BinlogListener {
public static void main(String[] args) {
// 連接 Canal
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example", "canal", "canal");
try {
connector.connect();
connector.subscribe(".*\\..*"); // 監(jiān)聽所有庫表
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(100); // 獲取數(shù)據(jù)
long batchId = message.getId();
List<CanalEntry.Entry> entries = message.getEntries();
if (batchId != -1 && !entries.isEmpty()) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
processEntry(entry);
}
}
}
connector.ack(batchId); // 確認(rèn)消息
}
} finally {
connector.disconnect();
}
}
private static void processEntry(CanalEntry.Entry entry) {
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println("變更表:" + entry.getHeader().getTableName());
System.out.println("變更類型:" + eventType);
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
System.out.println("刪除數(shù)據(jù):" + rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
System.out.println("新增數(shù)據(jù):" + rowData.getAfterColumnsList());
} else {
System.out.println("更新前數(shù)據(jù):" + rowData.getBeforeColumnsList());
System.out.println("更新后數(shù)據(jù):" + rowData.getAfterColumnsList());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
4. 代碼解析
1.創(chuàng)建 Canal 連接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example", "canal", "canal");
127.0.0.1:Canal 服務(wù)器地址11111:Canal 端口example:Canal 實例canal/canal:默認(rèn)賬號密碼
2.獲取 binlog 變更數(shù)據(jù)
Message message = connector.getWithoutAck(100);
getWithoutAck(100):拉取 100 條 binlog 事件。
3.解析 binlog
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
processEntry(entry);
}
}
僅處理 ROWDATA 類型的變更,忽略事務(wù)等其他信息。
4.分類處理 INSERT、UPDATE、DELETE
if (eventType == CanalEntry.EventType.DELETE) {
System.out.println("刪除數(shù)據(jù):" + rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
System.out.println("新增數(shù)據(jù):" + rowData.getAfterColumnsList());
} else {
System.out.println("更新前數(shù)據(jù):" + rowData.getBeforeColumnsList());
System.out.println("更新后數(shù)據(jù):" + rowData.getAfterColumnsList());
}
總結(jié)
- MySQL binlog 記錄數(shù)據(jù)庫變更,可用于監(jiān)聽增量數(shù)據(jù)。
- Canal 作為 MySQL 從庫解析 binlog,實現(xiàn)數(shù)據(jù)同步。
- Java 代碼示例 展示如何用 Canal 監(jiān)聽
INSERT、UPDATE、DELETE操作,并解析變更數(shù)據(jù)。
這種方案適用于 分布式數(shù)據(jù)同步、緩存一致性 和 數(shù)據(jù)變更通知,是實時數(shù)據(jù)處理的重要手段。
到此這篇關(guān)于如何利用Java實現(xiàn)MySQL的數(shù)據(jù)變化監(jiān)聽的文章就介紹到這了,更多相關(guān)Java監(jiān)聽MySQL數(shù)據(jù)變化內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java畢業(yè)設(shè)計實戰(zhàn)之生活旅行分享平臺的實現(xiàn)
這是一個使用了java+Springboot+JPA+Jsp+Html+js+Ajax+maven+mysql開發(fā)的生活旅行分享平臺,是一個畢業(yè)設(shè)計的實戰(zhàn)練習(xí),具有分享發(fā)布平臺該有的所有功能,感興趣的朋友快來看看吧2022-02-02
Java class文件格式之?dāng)?shù)據(jù)類型_動力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要介紹了Java class文件格式之?dāng)?shù)據(jù)類型的相關(guān)資料,需要的朋友可以參考下2017-06-06
Java zookeeper圖形化工具ZooInspector用法詳解
這篇文章主要介紹了Java zookeeper圖形化工具ZooInspector用法詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-07-07
Springboot?整合maven插口調(diào)用maven?release?plugin實現(xiàn)一鍵打包功能
這篇文章主要介紹了Springboot?整合maven插口調(diào)用maven?release?plugin實現(xiàn)一鍵打包功能,整合maven-invoker使程序去執(zhí)行mvn命令,結(jié)合示例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-03-03

