剖析后OpLog訂閱MongoDB的數(shù)據(jù)變更就沒那么難了
前言
我們開源了一個訂閱分發(fā)mysql的binlog的項目,一直用的非常好,忽然有天開發(fā)說能不能支持MongoDB的數(shù)據(jù)訂閱呢,MongoDB的使用度也挺廣泛的。安排。經(jīng)過簡單的了解后發(fā)現(xiàn)MongoDB也有類似binlog的機(jī)制,最終花了兩天時間把功能完成,并統(tǒng)一抽象集成到binlog開源項目中,使用和binlog同一套訂閱分發(fā)模型管理MongoDB數(shù)據(jù)源。整個過程非常順利,比整mysql的binlog要簡單的多了。
oplog簡介
先來聊聊MongoDB的主備機(jī)制,和mysql的binlog類似,在MongoDB中,有一個系統(tǒng)庫“”Local”,庫里有一個集合“oplog.rs”,這個集合類似于binlog文件,里面記錄了MongoDB的所有操作。從節(jié)點通過讀取oplog.rs里的數(shù)據(jù)做到數(shù)據(jù)同步。
解析oplog
和訂閱mysql的binlog一樣(模擬一個從節(jié)點mysql)。我們的訂閱服務(wù)要像從節(jié)點那樣讀取解析oplog.rs里的數(shù)據(jù)。解析前先看下oplog.rs的Document的數(shù)據(jù)結(jié)構(gòu)
上圖是一個插入的數(shù)據(jù)的日志,可見oplog的doc中共有如下字段,含義分別如下:
ts
:操作的時間戳(非常重要)
t
:term最初在主數(shù)據(jù)庫上生成操作的。(含義不明)
h
:本次操作的唯一hashID
v
: 版本號
op
:操作類型,有六種類型,我們只需要關(guān)注其中的i(插入)、u(更新)、d(刪除)即可
ns
:庫名和集合名稱,中間使用“.”連接
o
:本次操作的document內(nèi)容
o2
:只有op操作類型時u更新時,才會有這個字段,代表更新的條件語句
$set
:o2獲取后的文檔里的屬性,代表更新的字段
如上字段,完成一次oplog的解析,只需要ts、op、ns、o、o2、$set即可,其中ts非常重要,可以類比為binlog中的Position。同步mysql的數(shù)據(jù)時,通過記錄消費binlog的位置,也就是Position,可以有效避免訂閱服務(wù)停機(jī)后,消費記錄丟失的問題。同步MongoDB時,通過記錄ts的值,來記錄消費的位置,可以到達(dá)和訂閱binlog一樣的效果。和mysql訂閱不同的是,MongoDB的同步需要同步服務(wù)自己查詢,而且oplog在MongoDB4.0之前的版本有大小限制,超過設(shè)置的容量后,老的數(shù)據(jù)就會被丟失,在4.0之后的版本已經(jīng)解除了這個限制。
代碼
上面已經(jīng)分析了oplog的結(jié)構(gòu)以及訂閱步驟,下面我們直接構(gòu)建查詢即可,需要注意,每次獲取到的ts值,需要存儲記錄下來,已便重新訂閱時,從上次斷開的記錄重新開始。下面直接看代碼,重點邏輯都以注釋詳盡
private BsonTimestamp queryTs; @Test public void OpLogTest() { MongoClient mongoClient = new MongoClient(new MongoClientURI("mongodb://admin:admin@127.0.0.1:3717")); MongoCollectioncollection = mongoClient.getDatabase("local") .getCollection("oplog.rs"); //如果是首次訂閱,需要使用自然排序查詢,獲取第最后一次操作的操作時間戳。如果是續(xù)訂閱直接讀取記錄的值賦值給queryTs即可 FindIterabletsCursor = collection.find().sort(new BasicDBObject("$natural", -1)) .limit(1); Document tsDoc = tsCursor.first(); queryTs = (BsonTimestamp) tsDoc.get("ts"); while (true) try { //構(gòu)建查詢語句,查詢大于當(dāng)前查詢時間戳queryTs的記錄 BasicDBObject query = new BasicDBObject("ts", new BasicDBObject("$gt", queryTs)); MongoCursordocCursor = collection.find(query) .cursorType(CursorType.TailableAwait) //沒有數(shù)據(jù)時阻塞休眠 .noCursorTimeout(true) //防止服務(wù)器在不活動時間(10分鐘)后使空閑的游標(biāo)超時。 .oplogReplay(true) //結(jié)合query條件,獲取增量數(shù)據(jù),這個參數(shù)比較難懂,見:https://docs.mongodb.com/manual/reference/command/find/index.html .maxAwaitTime(1, TimeUnit.SECONDS) //設(shè)置此操作在服務(wù)器上的最大等待執(zhí)行時間 .iterator(); while (docCursor.hasNext()) { Document document = docCursor.next(); //更新查詢時間戳 queryTs = (BsonTimestamp) document.get("ts"); //TODO 在這里接收到數(shù)據(jù)后通過訂閱數(shù)據(jù)路由分發(fā) String op = document.getString("op"); String database = document.getString("ns"); Document context = (Document) document.get("o"); Document where = null; if (op.equals("u")) { where = (Document) document.get("o2"); if (context != null) { context = (Document) context.get("$set"); } } System.err.println("操作時間戳:" + queryTs.getTime()); System.err.println("操作類 型:" + op); System.err.println("數(shù)據(jù)庫.集合:" + database); System.err.println("更新條件:" + JSON.toJSONString(where)); System.err.println("文檔內(nèi)容:" + JSON.toJSONString(context)); } } catch (Exception e) { e.printStackTrace(); } }
結(jié)語
上面代碼只是一個簡單的測試用例,完整的應(yīng)用還需要考慮ts的記錄更新,事件的抽象,數(shù)據(jù)的分發(fā)等。我們已經(jīng)開源的binlog訂閱分發(fā)項目目前支持?jǐn)?shù)據(jù)源在線管理,訂閱數(shù)據(jù)(庫、表)在線管理,如果能夠使用同一套管理后臺管理binlog和oplog的訂閱在好不過。要實現(xiàn)和binlog統(tǒng)一管理模型,配置和分發(fā)方面基本不需要改動,然后從頂層數(shù)據(jù)源方面做區(qū)分實現(xiàn)即可。
目前我們整合管理的功能都已經(jīng)開發(fā)好了,關(guān)于oplog部分的代碼還沒提交到github上,后面會和大家相見。
以上就是剖析后OpLog訂閱MongoDB的數(shù)據(jù)變更就沒那么難了的詳細(xì)內(nèi)容,更多關(guān)于OpLog訂閱MongoDB的數(shù)據(jù)變更的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Mongodb設(shè)置TTL索引自動清除數(shù)據(jù)與過期數(shù)據(jù)的方法
今天小編就為大家分享一篇關(guān)于Mongodb設(shè)置TTL索引自動清除數(shù)據(jù)與過期數(shù)據(jù)的方法,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-03-03解決MongoDB占用內(nèi)存過大頻繁死機(jī)的方法詳解
這篇文章主要介紹了解決MongoDB占用內(nèi)存過大頻繁死機(jī)的方法詳解,需要的朋友可以參考下2020-02-02MongoDB下根據(jù)數(shù)組大小進(jìn)行查詢的方法
這篇文章主要介紹了MongoDB下根據(jù)數(shù)組大小進(jìn)行查詢的方法,分別實現(xiàn)了指定大小的數(shù)組和某個范圍的數(shù)組,需要的朋友可以參考下2014-04-04MongoDB進(jìn)階之動態(tài)字段設(shè)計詳解
這篇文章主要給大家介紹了MongoDB進(jìn)階之動態(tài)字段設(shè)計的相關(guān)資料,文中介紹的非常詳細(xì),對大家具有一定的參考學(xué)習(xí)價值,需要的朋友們下面跟著小編一起來學(xué)習(xí)學(xué)習(xí)吧。2017-06-06Mongodb中MapReduce實現(xiàn)數(shù)據(jù)聚合方法詳解
Mongodb是針對大數(shù)據(jù)量環(huán)境下誕生的用于保存大數(shù)據(jù)量的非關(guān)系型數(shù)據(jù)庫,針對大量的數(shù)據(jù)。接下來通過本文給大家介紹Mongodb中MapReduce實現(xiàn)數(shù)據(jù)聚合方法詳解,感興趣的朋友一起學(xué)習(xí)吧2016-05-05MongoDB使用自帶的命令行工具進(jìn)行備份和恢復(fù)的教程
這篇文章主要介紹了MongoDB使用自帶的命令行工具進(jìn)行備份和恢復(fù)的教程,我們只需要在命令行界面中用簡單的命令操作mongorestore和mongodump工具就可以實現(xiàn),需要的朋友可以參考下2016-06-06