探索Golang實現(xiàn)Redis持久化AOF實例
引言
用11篇文章實現(xiàn)一個可用的Redis服務,姑且叫EasyRedis吧,希望通過文章將Redis掰開撕碎了呈現(xiàn)給大家,而不是僅僅停留在八股文的層面,并且有非常爽的感覺,歡迎持續(xù)關注學習。
[x] easyredis之TCP服務
[x] easyredis之網(wǎng)絡請求序列化協(xié)議(RESP)
[x] easyredis之內存數(shù)據(jù)庫
[x] easyredis之過期時間 (時間輪實現(xiàn))
[x] easyredis之持久化 (AOF實現(xiàn))
[ ] easyredis之發(fā)布訂閱功能
[ ] easyredis之有序集合(跳表實現(xiàn))
[ ] easyredis之 pipeline 客戶端實現(xiàn)
[ ] easyredis之事務(原子性/回滾)
[ ] easyredis之連接池
[ ] easyredis之分布式集群存儲
【第五篇】EasyRedis之持久化AOF
AOF
全稱Append Only File
,就是將寫相關的命令,追加保存到文件中,當服務器重啟以后,將文件中的命令在服務端重放(重新執(zhí)行恢復數(shù)據(jù)),實現(xiàn)的一種持久化方式。
本篇通過3個部分講解AOF的實現(xiàn):
AOF的寫入過程
AOF的加載過程
AOF的重寫過程
AOF的寫入過程
在核心的數(shù)據(jù)結構 Engine
中新增一個 aof *AOF
對象
// 存儲引擎,負責數(shù)據(jù)的CRUD type Engine struct { // *DB dbSet []*atomic.Value // 時間輪(延遲任務) delay *timewheel.Delay // Append Only File aof *aof.AOF }
在初始化函數(shù)func NewEngine() *Engine
中,會基于是否啟用AOF日志,決定 aof *aof.AOF
的初始化
func NewEngine() *Engine { //.....省略.... // 啟用AOF日志 if conf.GlobalConfig.AppendOnly { // 創(chuàng)建*AOF對象 aof, err := aof.NewAOF(conf.GlobalConfig.AppendFilename, engine, true, conf.GlobalConfig.AppendFsync) if err != nil { panic(err) } engine.aof = aof // 設定每個db,使用aof寫入日志 engine.aofBindEveryDB() } return engine }
因為實際執(zhí)行redis
命令的對象是 *DB
,所以會對每個*DB
對象設定db.writeAof
函數(shù)指針
func (e *Engine) aofBindEveryDB() { for _, dbSet := range e.dbSet { db := dbSet.Load().(*DB) db.writeAof = func(redisCommand [][]byte) { if conf.GlobalConfig.AppendOnly { // 調用e.aof對象方法,保存命令 e.aof.SaveRedisCommand(db.index, aof.Command(redisCommand)) } } } }
例如,當我們執(zhí)行 set key value
命令的時候,實際會執(zhí)行 func cmdSet(db *DB, args [][]byte) protocol.Reply
func cmdSet(db *DB, args [][]byte) protocol.Reply { //.....省略.... if result > 0 { // 1 表示存儲成功 //TODO: 過期時間處理 if ttl != nolimitedTTL { // 設定key過期 expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond) db.ExpireAt(key, expireTime) //寫入日志 db.writeAof(aof.SetCmd([][]byte{args[0], args[1]}...)) db.writeAof(aof.PExpireAtCmd(string(args[0]), expireTime)) } else { // 設定key不過期 db.Persist(key) //寫入日志 db.writeAof(aof.SetCmd(args...)) } return protocol.NewOkReply() } return protocol.NewNullBulkReply() }
可以看到,會調用上面剛才設定的db.writeAof
函數(shù),將當前的命令保存到AOF中。所以我們實際看下 SaveRedisCommand
函數(shù)中具體在做什么事情。代碼路徑位于aof/aof.go
func (aof *AOF) SaveRedisCommand(dbIndex int, command Command) { // 關閉 if aof.atomicClose.Load() { return } // 寫入文件 & 刷盤 if aof.aofFsync == FsyncAlways { record := aofRecord{ dbIndex: dbIndex, command: command, } aof.writeAofRecord(record) return } // 寫入緩沖 aof.aofChan <- aofRecord{ dbIndex: dbIndex, command: command, } }
因為AOF的刷盤(Sync)有三種模式:
寫入 & 立即刷盤
寫入 & 每秒刷盤
寫入 & 不主動刷盤(取決于操作系統(tǒng)自動刷盤)
如果配置的是always
模式,會立即執(zhí)行aof.writeAofRecord(record)
;否則就將數(shù)據(jù)先保存在緩沖aof.aofChan
中(這里其實又是生產(chǎn)者消費者模型)最后在消費協(xié)程中,執(zhí)行寫入
func (aof *AOF) watchChan() { for record := range aof.aofChan { aof.writeAofRecord(record) } aof.aofFinished <- struct{}{} }
所以我們只需要看下 writeAofRecord
函數(shù)即可,其實就是把命令按照Redis 序列化協(xié)議
的格式,寫入到文件中。給大家看下更直觀的演示圖:
再看下在 append.aof
文件中具體的數(shù)據(jù)格式:
這里有個很重要點:因為AOF文件是所有的*DB
對象復用的文件,寫入的redis命令歸屬于不同的數(shù)據(jù)庫的
舉個例子: 比如在0號數(shù)據(jù)庫,我們執(zhí)行set key value
,在3號數(shù)據(jù)庫,我們執(zhí)行set key value
,在日志文件中會記錄兩條命令,但是這兩個命令其實是不同數(shù)據(jù)庫的命令。在恢復命令到數(shù)據(jù)庫的時候,應該在不同的數(shù)據(jù)庫中執(zhí)行該命令。所以在記錄命令的時候,我們還要記錄下他的數(shù)據(jù)庫是什么?這樣恢復的時候,才能知道命令的數(shù)據(jù)庫的歸屬問題。
func (aof *AOF) writeAofRecord(record aofRecord) { aof.mu.Lock() defer aof.mu.Unlock() // 因為aof對象是所有數(shù)據(jù)庫對象【復用】寫入文件方法,每個數(shù)據(jù)庫的索引不同 // 所以,每個命令的執(zhí)行,有個前提就是操作的不同的數(shù)據(jù)庫 if record.dbIndex != aof.lastDBIndex { // 構建select index 命令 & 寫入文件 selectCommand := [][]byte{[]byte("select"), []byte(strconv.Itoa(record.dbIndex))} data := protocol.NewMultiBulkReply(selectCommand).ToBytes() _, err := aof.aofFile.Write(data) if err != nil { logger.Warn(err) return } aof.lastDBIndex = record.dbIndex } // redis命令 data := protocol.NewMultiBulkReply(record.command).ToBytes() _, err := aof.aofFile.Write(data) if err != nil { logger.Warn(err) } logger.Debugf("write aof command:%q", data) // 每次寫入刷盤 if aof.aofFsync == FsyncAlways { aof.aofFile.Sync() } }
AOF的加載過程
在服務啟動的時候,將*.aof
文件中的命令,在服務端進行重放,效果演示如下:
代碼路徑位于aof/aof.go
// 構建AOF對象 func NewAOF(aofFileName string, engine abstract.Engine, load bool, fsync string) (*AOF, error) { //...省略... // 啟動加載aof文件 if load { aof.LoadAof(0) } //...省略... }
aof.LoadAof(0)
函數(shù)的本質就是從文件中,按照行讀取數(shù)據(jù)。如果看過之前的文章,這里其實復用了parser.ParseStream(reader)
函數(shù),負責從文件解析redis序列化協(xié)議
格式的命令,最后利用數(shù)據(jù)庫引擎,將命令數(shù)據(jù)保存到內存中(命令重放)
func (aof *AOF) LoadAof(maxBytes int) { // 目的:當加載aof文件的時候,因為需要復用engine對象,內部重放命令的時候會自動寫aof日志,加載aof 禁用 SaveRedisCommand的寫入 aof.atomicClose.Store(true) deferfunc() { aof.atomicClose.Store(false) }() // 只讀打開文件 file, err := os.Open(aof.aofFileName) if err != nil { logger.Error(err.Error()) return } defer file.Close() file.Seek(0, io.SeekStart) var reader io.Reader if maxBytes > 0 { // 限定讀取的字節(jié)大小 reader = io.LimitReader(file, int64(maxBytes)) } else { // 不限定,直接讀取到文件結尾(為止) reader = file } // 文件中保存的格式和網(wǎng)絡傳輸?shù)母袷揭恢? ch := parser.ParseStream(reader) virtualConn := connection.NewVirtualConn() for payload := range ch { if payload.Err != nil { // 文件已經(jīng)讀取到“完成“ if payload.Err == io.EOF { break } // 讀取到非法的格式 logger.Errorf("LoadAof parser error %+v:", payload.Err) continue } if payload.Reply == nil { logger.Error("empty payload data") continue } // 從文件中讀取到命令 reply, ok := payload.Reply.(*protocol.MultiBulkReply) if !ok { logger.Error("require multi bulk protocol") continue } // 利用數(shù)據(jù)庫引擎,將命令數(shù)據(jù)保存到內存中(命令重放) ret := aof.engine.Exec(virtualConn, reply.RedisCommand) // 判斷是否執(zhí)行失敗 if protocol.IsErrReply(ret) { logger.Error("exec err ", string(ret.ToBytes())) } // 判斷命令是否是"select" if strings.ToLower(string(reply.RedisCommand[0])) == "select" { dbIndex, err := strconv.Atoi(string(reply.RedisCommand[1])) if err == nil { aof.lastDBIndex = dbIndex // 記錄下數(shù)據(jù)恢復過程中,選中的數(shù)據(jù)庫索引 } } } }
AOF的重寫過程
代碼路徑aof/rewrite.go
重寫的過程就是下面的函數(shù)
func (aof *AOF) Rewrite(engine abstract.Engine) { //1.對現(xiàn)有的aof文件做一次快照 snapShot, err := aof.startRewrite() if err != nil { logger.Errorf("StartRewrite err: %+v", err) return } //2. 將現(xiàn)在的aof文件數(shù)據(jù),加在到新(內存)對象中,并重寫入新aof文件中 err = aof.doRewrite(snapShot, engine) if err != nil { logger.Errorf("doRewrite err: %+v", err) return } //3. 將重寫過程中的增量命令寫入到新文件中 err = aof.finishRewrite(snapShot) if err != nil { logger.Errorf("finishRewrite err: %+v", err) } }
整個的處理思想很重要:如下圖
總結
代碼的思路應該還是比較清晰,但是細節(jié)上的處理非常容易讓人大腦宕機。建議還是看下源碼,邊看邊自己敲一下,感受是不一樣
項目代碼地址: https://github.com/gofish2020/easyredis
以上就是探索Golang實現(xiàn)Redis持久化AOF實例的詳細內容,更多關于Golang Redis持久化AOF的資料請關注腳本之家其它相關文章!
相關文章
Golang中crypto/ecdsa庫實現(xiàn)數(shù)字簽名和驗證
本文主要介紹了Golang中crypto/ecdsa庫實現(xiàn)數(shù)字簽名和驗證,將從ECDSA的基本原理出發(fā),詳細解析如何在Go語言中實現(xiàn)數(shù)字簽名和驗證,具有一定的參考價值,感興趣的可以了解一下2024-02-02golang開發(fā)?gorilla?websocket的使用示例詳解
這篇文章主要介紹了golang開發(fā)?gorilla?websocket的使用示例詳解,介紹了websocket的簡單使用,我們使用的版本是1.3.0,具體操作方法跟隨小編一起學習吧2024-05-05Go語言使用kafka-go實現(xiàn)Kafka消費消息
本篇文章主要介紹了使用kafka-go庫消費Kafka消息,包含F(xiàn)etchMessage和ReadMessage的區(qū)別和適用場景,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的可以了解一下2024-12-12Golang中goroutine和channel使用介紹深入分析
一次只做一件事情并不是完成任務最快的方法,一些大的任務可以拆解成若干個小任務,goroutine可以讓程序同時處理幾個不同的任務,goroutine使用channel來協(xié)調它們的工作,channel允許goroutine互相發(fā)送數(shù)據(jù)并同步,這樣一個goroutine就不會領先于另一個goroutine2023-01-01Go并發(fā)原語之SingleFlight請求合并方法實例
本文我們來學習一下 Go 語言的擴展并發(fā)原語:SingleFlight,SingleFlight 的作用是將并發(fā)請求合并成一個請求,以減少重復的進程來優(yōu)化 Go 代碼2023-12-12go實現(xiàn)thrift的網(wǎng)絡傳輸性能及需要注意問題示例解析
這篇文章主要為大家介紹了go實現(xiàn)thrift的網(wǎng)絡傳輸性能及需要注意問題示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-09-09