解析SQL?Server?CDC配合Kafka?Connect監(jiān)聽數據變化的問題
寫在前面
好久沒更新Blog了,從CRUD Boy轉型大數據開發(fā),拉寬了不少的知識面,從今年年初開始籌備、組建、招兵買馬,到現在穩(wěn)定開搞中,期間踏過無數的火坑,也許除了這篇還很寫上三四篇。
進入主題,通常企業(yè)為了實現數據統(tǒng)計、數據分析、數據挖掘、解決信息孤島等全局數據的系統(tǒng)化運作管理 ,為BI、經營分析、決策支持系統(tǒng)等深度開發(fā)應用奠定基礎,挖掘數據價值 ,企業(yè)會開始著手建立數據倉庫,數據中臺。而這些數據來源則來自于企業(yè)的各個業(yè)務系統(tǒng)的數據或爬取外部的數據,從業(yè)務系統(tǒng)數據到數據倉庫的過程就是一個ETL(Extract-Transform-Load)行為,包括了采集、清洗、數據轉換等主要過程,通常異構數據抽取轉換使用Sqoop、DataX等,日志采集Flume、Logstash、Filebeat等。
數據抽取分為全量抽取和增量抽取,全量抽取類似于數據遷移或數據復制,全量抽取很好理解;增量抽取在全量的基礎上做增量,只監(jiān)聽、捕捉動態(tài)變化的數據。如何捕捉數據的變化是增量抽取的關鍵,一是準確性,必須保證準確的捕捉到數據的動態(tài)變化,二是性能,不能對業(yè)務系統(tǒng)造成太大的壓力。
增量抽取方式
通常增量抽取有幾種方式,各有優(yōu)缺點。
1. 觸發(fā)器
在源數據庫上的目標表創(chuàng)建觸發(fā)器,監(jiān)聽增、刪、改操作,捕捉到數據的變更寫入臨時表。
優(yōu)點:操作簡單、規(guī)則清晰,對源表不影響;
缺點:對源數據庫有侵入,對業(yè)務系統(tǒng)有一定的影響;
2. 全表比對
在ETL過程中,抽取方建立臨時表待全量抽取存儲,然后在進行比對數據。
優(yōu)點:對源數據庫、源表都無需改動,完全交付ETL過程處理,統(tǒng)一管理;
缺點:ETL效率低、設計復雜,數據量越大,速度越慢,時效性不確定;
3. 全表刪除后再插入
在抽取數據之前,先將表中數據清空,然后全量抽取。
優(yōu)點:ETL 操作簡單,速度快。
缺點:全量抽取一般采取T+1的形式,抽取數據量大的表容易對數據庫造成壓力;
4. 時間戳
時間戳的方式即在源表上增加時間戳列,對發(fā)生變更的表進行更新,然后根據時間戳進行提取。
優(yōu)點:操作簡單,ELT邏輯清晰,性能比較好;
缺點:對業(yè)務系統(tǒng)有侵入,數據庫表也需要額外增加字段。對于老的業(yè)務系統(tǒng)可能不容易做變更。
5. CDC方式
變更數據捕獲Change Data Capture(簡稱CDC),SQLServer為實時更新數據同步提供了CDC機制,類似于Mysql的binlog,將數據更新操作維護到一張CDC表中。開啟CDC的源表在插入INSERT、更新UPDATE和刪除DELETE活動時會插入數據到日志表中。cdc通過捕獲進程將變更數據捕獲到變更表中,通過cdc提供的查詢函數,可以捕獲這部分數據。詳情可以查看官方介紹:關于變更數據捕獲 (SQL Server)

優(yōu)點:提供易于使用的API 來設置CDC 環(huán)境,縮短ETL 的時間,無需修改業(yè)務系統(tǒng)表結構。
缺點:受數據庫版本的限制,實現過程相對復雜。
CDC增量抽取
先決條件
1. 已搭建好Kafka集群,Zookeeper集群;
2. 源數據庫支持CDC,版本采用開發(fā)版或企業(yè)版。
案例環(huán)境:
Ubuntu 20.04
Kafka2.13-2.7.0
Zookeeper 3.6.2
SQL Server 2012
步驟
除了數據庫開啟CDC支持以外,主要還是要將變更的數據通過Kafka Connect傳輸數據,Debezium是目前官方推薦的連接器,它支持絕大多數主流數據庫:MySQL、PostgreSQL、SQL Server、Oracle等等,詳情查看Connectors。
1. 數據庫步驟
開啟數據庫CDC支持
在源數據庫執(zhí)行以下命令:
EXEC sys.sp_cdc_enable_db GO
附上關閉語句:
exec sys.sp_cdc_disable_db
查詢是否啟用
select * from sys.databases where is_cdc_enabled = 1
創(chuàng)建測試數據表:(已有表則跳過此步驟)
create table T_LioCDC
(
ID int identity(1,1) primary key ,
Name nvarchar(16),
Sex bit,
CreateTime datetime,
UpdateTime datetime
);
對源表開啟CDC支持:
exec sp_cdc_enable_table @source_schema='dbo', @source_name='T_LioCDC', @role_name=null, @supports_net_changes = 1;
確認是否有權限訪問CDC Table:
EXEC sys.sp_cdc_help_change_data_capture

確認SQL Server Agent已開啟:
EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'

以上則完成對數據庫的CDC操作。
2. Kafka步驟
Kafka Connect的工作模式分為兩種,分別是standalone模式和distributed模式。standalone用于單機測試,本文用distributed模式,用于生產環(huán)境。(Kafka必須先運行啟動,再進行以下步驟進行配置。)
下載Sql Server Connector
下載連接器后,創(chuàng)建一個文件夾來存放,解壓到該目錄下即可,例子路徑:/usr/soft/kafka/kafka_2.13_2.7.0/plugins(記住這個路徑,配置中要用到)

下載地址:debezium-connector-sqlserver-1.5.0.Final-plugin.tar.gz

編輯connect-distributed.properties配置
修改Kafka connect配置文件,$KAFKA_HOME/config/connect-distributed.properties,變更內容如下:
//kafka集群ip+portbootstrap.servers=172.192.10.210:9092,172.192.10.211:9092,172.192.10.212:9092 key.converter.schemas.enable=false value.converter.schemas.enable=false offset.storage.topic=connect-offsets offset.storage.replication.factor=1 offset.storage.partitions=3 offset.storage.cleanup.policy=compact config.storage.topic=connect-configs config.storage.replication.factor=1 status.storage.topic=connect-status status.storage.replication.factor=1 status.storage.partitions=3 //剛剛下載連接器解壓的路徑 plugin.path=/usr/soft/kafka/kafka_2.13_2.7.0/plugins
看到配置中有三個Topic,分別是
config.storage.topic:用以保存connector和task的配置信息,需要注意的是這個主題的分區(qū)數只能是1,而且是有多副本的。
offset.storage.topic:用以保存offset信息。
status.storage.topic:用以保存connetor的狀態(tài)信息。
這些Topic可以不用創(chuàng)建,啟動后會默認創(chuàng)建。
啟動Kafka集群
保存配置之后,將connect-distributed.properties分發(fā)到集群中,然后啟動:
bin/connect-distributed.sh config/connect-distributed.properties
檢查是否啟動
connector支持REST API的方式進行管理,所以用Post man或者Fiddler可以調用相關接口進行管理。檢查是否啟動:

不用奇怪,上面配置集群的IP是172段,這里的192.168.1.177仍是我的集群中的一個服務器,因為服務器都使用了雙網卡。因為還沒有連接器相關配置,所以接口返回是一個空數組,接下來將新增一個連接器。
編寫sqlserver-cdc-source.json
{
"name": "sqlserver-cdc-source",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"database.server.name" : "JnServer",
"database.hostname" : "172.192.20.2", --目標數據庫的ip
"database.port" : "1433", --目標數據庫的端口
"database.user" : "sa", --目標數據庫的賬號
"database.password" : "123456", --密碼
"database.dbname" : "Dis", --目標數據庫的數據庫名稱
"table.whitelist": "dbo.T_LioCDC", --監(jiān)聽表名
"schemas.enable" : "false",
"mode":"incrementing", --增量模式
"incrementing.column.name": "ID", --增量列名
"database.history.kafka.bootstrap.servers" : "172.192.10.210:9092,172.192.10.211:9092,172.192.10.212", --kafka集群
"database.history.kafka.topic": "TopicTLioCDC", --kafka topic內部使用,不是由消費者使用
"value.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}
//源文地址:?https://www.cnblogs.com/EminemJK/p/14688907.html
還有其他額外的配置,可以參考官方文檔。然后執(zhí)行

繼續(xù)執(zhí)行檢查,就發(fā)現連接器已經成功配置了:

其他API
GET /connectors – 返回所有正在運行的connector名。
POST /connectors – 新建一個connector; 請求體必須是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必須包含你的connector的配置信息。
GET /connectors/{name} – 獲取指定connetor的信息。
GET /connectors/{name}/config – 獲取指定connector的配置信息。
PUT /connectors/{name}/config – 更新指定connector的配置信息。
GET /connectors/{name}/status – 獲取指定connector的狀態(tài),包括它是否在運行、停止、或者失敗,如果發(fā)生錯誤,還會列出錯誤的具體信息。
GET /connectors/{name}/tasks – 獲取指定connector正在運行的task。
GET /connectors/{name}/tasks/{taskid}/status – 獲取指定connector的task的狀態(tài)信息。
PUT /connectors/{name}/pause – 暫停connector和它的task,停止數據處理知道它被恢復。
PUT /connectors/{name}/resume – 恢復一個被暫停的connector。
POST /connectors/{name}/restart – 重啟一個connector,尤其是在一個connector運行失敗的情況下比較常用
POST /connectors/{name}/tasks/{taskId}/restart – 重啟一個task,一般是因為它運行失敗才這樣做。
DELETE /connectors/{name} – 刪除一個connector,停止它的所有task并刪除配置。//源文地址:?https://www.cnblogs.com/EminemJK/p/14688907.html
查看Topic
/usr/soft/kafka/kafka_2.13_2.7.0# bin/kafka-topics.sh --list --zookeeper localhost:2000

TopicJnServer.dbo.T_LioCDC則是供我們消費的主題,啟動一個消費者進行監(jiān)聽測試:
bin/kafka-console-consumer.sh --bootstrap-server 172.192.10.210:9092? --consumer-property group.id=group1 --consumer-property client.id=consumer-1? --topic JnServer.dbo.T_LioCDC
然后再源表進行一些列增刪改操作,
--測試代碼
insert into T_LioCDC(name, sex, createtime,UpdateTime) values ('A',1,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime) values ('B',0,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime) values ('C',1,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime) values ('D',0,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime) values ('E',1,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime) values ('F',1,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime) values ('G',0,getdate(),getdate())
update T_LioCDC
set Name='Lio.Huang',UpdateTime=getdate()
where ID=7

已經成功捕捉到數據的變更,對比幾個操作Json,依次是insert、update、delete:



到此這篇關于SQL?Server?CDC配合Kafka?Connect監(jiān)聽數據變化的文章就介紹到這了,更多相關SQL?Server?CDC監(jiān)聽數據變化內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SQLServer 優(yōu)化SQL語句 in 和not in的替代方案
用IN寫出來的SQL的優(yōu)點是比較容易寫及清晰易懂,這比較適合現代軟件開發(fā)的風格。2010-04-04

