亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

探索Golang實現(xiàn)Redis持久化AOF實例

 更新時間:2024年01月24日 09:38:59   作者:紹納?nullbody筆記  
這篇文章主要為大家介紹了Golang實現(xiàn)Redis持久化AOF實例探索,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

引言

用11篇文章實現(xiàn)一個可用的Redis服務,姑且叫EasyRedis吧,希望通過文章將Redis掰開撕碎了呈現(xiàn)給大家,而不是僅僅停留在八股文的層面,并且有非常爽的感覺,歡迎持續(xù)關注學習。

[x] easyredis之TCP服務

[x] easyredis之網(wǎng)絡請求序列化協(xié)議(RESP)

[x] easyredis之內存數(shù)據(jù)庫

[x] easyredis之過期時間 (時間輪實現(xiàn))

[x] easyredis之持久化 (AOF實現(xiàn))

[ ] easyredis之發(fā)布訂閱功能

[ ] easyredis之有序集合(跳表實現(xiàn))

[ ] easyredis之 pipeline 客戶端實現(xiàn)

[ ] easyredis之事務(原子性/回滾)

[ ] easyredis之連接池

[ ] easyredis之分布式集群存儲

【第五篇】EasyRedis之持久化AOF

AOF全稱Append Only File,就是將寫相關的命令,追加保存到文件中,當服務器重啟以后,將文件中的命令在服務端重放(重新執(zhí)行恢復數(shù)據(jù)),實現(xiàn)的一種持久化方式。

本篇通過3個部分講解AOF的實現(xiàn):

  • AOF的寫入過程

  • AOF的加載過程

  • AOF的重寫過程

AOF的寫入過程

在核心的數(shù)據(jù)結構 Engine中新增一個 aof *AOF對象

// 存儲引擎,負責數(shù)據(jù)的CRUD
type Engine struct {
	// *DB
	dbSet []*atomic.Value
	// 時間輪(延遲任務)
	delay *timewheel.Delay
	// Append Only File
	aof *aof.AOF
}

在初始化函數(shù)func NewEngine() *Engine中,會基于是否啟用AOF日志,決定 aof *aof.AOF的初始化

func NewEngine() *Engine {
	//.....省略....
	// 啟用AOF日志
	if conf.GlobalConfig.AppendOnly {
		// 創(chuàng)建*AOF對象
		aof, err := aof.NewAOF(conf.GlobalConfig.AppendFilename, engine, true, conf.GlobalConfig.AppendFsync)
		if err != nil {
			panic(err)
		}
		engine.aof = aof
		// 設定每個db,使用aof寫入日志
		engine.aofBindEveryDB()
	}
	return engine
}

因為實際執(zhí)行redis命令的對象是 *DB,所以會對每個*DB對象設定db.writeAof函數(shù)指針

func (e *Engine) aofBindEveryDB() {
	for _, dbSet := range e.dbSet {
		db := dbSet.Load().(*DB)
		db.writeAof = func(redisCommand [][]byte) {
			if conf.GlobalConfig.AppendOnly {
                // 調用e.aof對象方法,保存命令
				e.aof.SaveRedisCommand(db.index, aof.Command(redisCommand))
			}
		}
	}
}

例如,當我們執(zhí)行 set key value命令的時候,實際會執(zhí)行 func cmdSet(db *DB, args [][]byte) protocol.Reply

func cmdSet(db *DB, args [][]byte) protocol.Reply {
    //.....省略....
	if result > 0 { // 1 表示存儲成功
		//TODO: 過期時間處理
		if ttl != nolimitedTTL { // 設定key過期
			expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond)
			db.ExpireAt(key, expireTime)
			//寫入日志
			db.writeAof(aof.SetCmd([][]byte{args[0], args[1]}...))
			db.writeAof(aof.PExpireAtCmd(string(args[0]), expireTime))
		} else { // 設定key不過期
			db.Persist(key)
			//寫入日志
			db.writeAof(aof.SetCmd(args...))
		}
		return protocol.NewOkReply()
	}
	return protocol.NewNullBulkReply()
}

可以看到,會調用上面剛才設定的db.writeAof函數(shù),將當前的命令保存到AOF中。所以我們實際看下 SaveRedisCommand函數(shù)中具體在做什么事情。代碼路徑位于aof/aof.go

func (aof *AOF) SaveRedisCommand(dbIndex int, command Command) {
	// 關閉
	if aof.atomicClose.Load() {
		return
	}
	// 寫入文件 & 刷盤
	if aof.aofFsync == FsyncAlways {
		record := aofRecord{
			dbIndex: dbIndex,
			command: command,
		}
		aof.writeAofRecord(record)
		return
	}
	// 寫入緩沖
	aof.aofChan <- aofRecord{
		dbIndex: dbIndex,
		command: command,
	}
}

因為AOF的刷盤(Sync)有三種模式:

  • 寫入 & 立即刷盤

  • 寫入 & 每秒刷盤

  • 寫入 & 不主動刷盤(取決于操作系統(tǒng)自動刷盤)

如果配置的是always模式,會立即執(zhí)行aof.writeAofRecord(record);否則就將數(shù)據(jù)先保存在緩沖aof.aofChan中(這里其實又是生產(chǎn)者消費者模型)最后在消費協(xié)程中,執(zhí)行寫入

func (aof *AOF) watchChan() {
	for record := range aof.aofChan {
		aof.writeAofRecord(record)
	}
	aof.aofFinished <- struct{}{}
}

所以我們只需要看下 writeAofRecord函數(shù)即可,其實就是把命令按照Redis 序列化協(xié)議的格式,寫入到文件中。給大家看下更直觀的演示圖:

再看下在 append.aof文件中具體的數(shù)據(jù)格式:

這里有個很重要點:因為AOF文件是所有的*DB對象復用的文件,寫入的redis命令歸屬于不同的數(shù)據(jù)庫的

舉個例子: 比如在0號數(shù)據(jù)庫,我們執(zhí)行set key value,在3號數(shù)據(jù)庫,我們執(zhí)行set key value,在日志文件中會記錄兩條命令,但是這兩個命令其實是不同數(shù)據(jù)庫的命令。在恢復命令到數(shù)據(jù)庫的時候,應該在不同的數(shù)據(jù)庫中執(zhí)行該命令。所以在記錄命令的時候,我們還要記錄下他的數(shù)據(jù)庫是什么?這樣恢復的時候,才能知道命令的數(shù)據(jù)庫的歸屬問題。

func (aof *AOF) writeAofRecord(record aofRecord) {
	aof.mu.Lock()
	defer aof.mu.Unlock()
	// 因為aof對象是所有數(shù)據(jù)庫對象【復用】寫入文件方法,每個數(shù)據(jù)庫的索引不同
	// 所以,每個命令的執(zhí)行,有個前提就是操作的不同的數(shù)據(jù)庫
	if record.dbIndex != aof.lastDBIndex {
		// 構建select index 命令 & 寫入文件
		selectCommand := [][]byte{[]byte("select"), []byte(strconv.Itoa(record.dbIndex))}
		data := protocol.NewMultiBulkReply(selectCommand).ToBytes()
		_, err := aof.aofFile.Write(data)
		if err != nil {
			logger.Warn(err)
			return
		}
		aof.lastDBIndex = record.dbIndex
	}
	// redis命令
	data := protocol.NewMultiBulkReply(record.command).ToBytes()
	_, err := aof.aofFile.Write(data)
	if err != nil {
		logger.Warn(err)
	}
	logger.Debugf("write aof command:%q", data)
	// 每次寫入刷盤
	if aof.aofFsync == FsyncAlways {
		aof.aofFile.Sync()
	}
}

AOF的加載過程

在服務啟動的時候,將*.aof文件中的命令,在服務端進行重放,效果演示如下:

代碼路徑位于aof/aof.go

// 構建AOF對象
func NewAOF(aofFileName string, engine abstract.Engine, load bool, fsync string) (*AOF, error) {
	//...省略...
	// 啟動加載aof文件
	if load {
		aof.LoadAof(0)
	}
    //...省略...
}

aof.LoadAof(0)函數(shù)的本質就是從文件中,按照行讀取數(shù)據(jù)。如果看過之前的文章,這里其實復用了parser.ParseStream(reader)函數(shù),負責從文件解析redis序列化協(xié)議格式的命令,最后利用數(shù)據(jù)庫引擎,將命令數(shù)據(jù)保存到內存中(命令重放)

func (aof *AOF) LoadAof(maxBytes int) {
	// 目的:當加載aof文件的時候,因為需要復用engine對象,內部重放命令的時候會自動寫aof日志,加載aof 禁用 SaveRedisCommand的寫入
	aof.atomicClose.Store(true)
	deferfunc() {
		aof.atomicClose.Store(false)
	}()
	// 只讀打開文件
	file, err := os.Open(aof.aofFileName)
	if err != nil {
		logger.Error(err.Error())
		return
	}
	defer file.Close()
	file.Seek(0, io.SeekStart)
	var reader io.Reader
	if maxBytes > 0 { // 限定讀取的字節(jié)大小
		reader = io.LimitReader(file, int64(maxBytes))
	} else { // 不限定,直接讀取到文件結尾(為止)
		reader = file
	}
	// 文件中保存的格式和網(wǎng)絡傳輸?shù)母袷揭恢?
	ch := parser.ParseStream(reader)
	virtualConn := connection.NewVirtualConn()
	for payload := range ch {
		if payload.Err != nil {
			// 文件已經(jīng)讀取到“完成“
			if payload.Err == io.EOF {
				break
			}
			// 讀取到非法的格式
			logger.Errorf("LoadAof parser error %+v:", payload.Err)
			continue
		}
		if payload.Reply == nil {
			logger.Error("empty payload data")
			continue
		}
		// 從文件中讀取到命令
		reply, ok := payload.Reply.(*protocol.MultiBulkReply)
		if !ok {
			logger.Error("require multi bulk protocol")
			continue
		}
		// 利用數(shù)據(jù)庫引擎,將命令數(shù)據(jù)保存到內存中(命令重放)
		ret := aof.engine.Exec(virtualConn, reply.RedisCommand)
		// 判斷是否執(zhí)行失敗
		if protocol.IsErrReply(ret) {
			logger.Error("exec err ", string(ret.ToBytes()))
		}
		// 判斷命令是否是"select"
		if strings.ToLower(string(reply.RedisCommand[0])) == "select" {
			dbIndex, err := strconv.Atoi(string(reply.RedisCommand[1]))
			if err == nil {
				aof.lastDBIndex = dbIndex // 記錄下數(shù)據(jù)恢復過程中,選中的數(shù)據(jù)庫索引
			}
		}
	}
}

AOF的重寫過程

代碼路徑aof/rewrite.go重寫的過程就是下面的函數(shù)

func (aof *AOF) Rewrite(engine abstract.Engine) {
	//1.對現(xiàn)有的aof文件做一次快照
	snapShot, err := aof.startRewrite()
	if err != nil {
		logger.Errorf("StartRewrite err: %+v", err)
		return
	}
	//2. 將現(xiàn)在的aof文件數(shù)據(jù),加在到新(內存)對象中,并重寫入新aof文件中
	err = aof.doRewrite(snapShot, engine)
	if err != nil {
		logger.Errorf("doRewrite err: %+v", err)
		return
	}
	//3. 將重寫過程中的增量命令寫入到新文件中
	err = aof.finishRewrite(snapShot)
	if err != nil {
		logger.Errorf("finishRewrite err: %+v", err)
	}
}

整個的處理思想很重要:如下圖

總結

代碼的思路應該還是比較清晰,但是細節(jié)上的處理非常容易讓人大腦宕機。建議還是看下源碼,邊看邊自己敲一下,感受是不一樣

項目代碼地址: https://github.com/gofish2020/easyredis 

以上就是探索Golang實現(xiàn)Redis持久化AOF實例的詳細內容,更多關于Golang Redis持久化AOF的資料請關注腳本之家其它相關文章!

相關文章

  • Go語言單元測試超詳細解析

    Go語言單元測試超詳細解析

    本文介紹了了Go語言單元測試超詳細解析,測試函數(shù)分為函數(shù)的基本測試、函數(shù)的組測試、函數(shù)的子測試,進行基準測試時往往是對函數(shù)的算法進行測驗,有時后一個算法在測試數(shù)據(jù)的基量不同時測試出的效果會不同我們需要對不同數(shù)量級的樣本進行測試,下文需要的朋友可以參考下
    2022-02-02
  • Golang性能優(yōu)化的技巧分享

    Golang性能優(yōu)化的技巧分享

    性能優(yōu)化的前提是滿足正確可靠、簡潔清晰等質量因素,針對?Go語言特性,本文為大家整理了一些Go語言相關的性能優(yōu)化建議,感興趣的可以了解一下
    2023-07-07
  • Golang中crypto/ecdsa庫實現(xiàn)數(shù)字簽名和驗證

    Golang中crypto/ecdsa庫實現(xiàn)數(shù)字簽名和驗證

    本文主要介紹了Golang中crypto/ecdsa庫實現(xiàn)數(shù)字簽名和驗證,將從ECDSA的基本原理出發(fā),詳細解析如何在Go語言中實現(xiàn)數(shù)字簽名和驗證,具有一定的參考價值,感興趣的可以了解一下
    2024-02-02
  • GoFrame實現(xiàn)順序性校驗示例詳解

    GoFrame實現(xiàn)順序性校驗示例詳解

    這篇文章主要為大家介紹了GoFrame實現(xiàn)順序性校驗示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-06-06
  • golang開發(fā)?gorilla?websocket的使用示例詳解

    golang開發(fā)?gorilla?websocket的使用示例詳解

    這篇文章主要介紹了golang開發(fā)?gorilla?websocket的使用示例詳解,介紹了websocket的簡單使用,我們使用的版本是1.3.0,具體操作方法跟隨小編一起學習吧
    2024-05-05
  • Go語言使用kafka-go實現(xiàn)Kafka消費消息

    Go語言使用kafka-go實現(xiàn)Kafka消費消息

    本篇文章主要介紹了使用kafka-go庫消費Kafka消息,包含F(xiàn)etchMessage和ReadMessage的區(qū)別和適用場景,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的可以了解一下
    2024-12-12
  • Golang中goroutine和channel使用介紹深入分析

    Golang中goroutine和channel使用介紹深入分析

    一次只做一件事情并不是完成任務最快的方法,一些大的任務可以拆解成若干個小任務,goroutine可以讓程序同時處理幾個不同的任務,goroutine使用channel來協(xié)調它們的工作,channel允許goroutine互相發(fā)送數(shù)據(jù)并同步,這樣一個goroutine就不會領先于另一個goroutine
    2023-01-01
  • Go并發(fā)原語之SingleFlight請求合并方法實例

    Go并發(fā)原語之SingleFlight請求合并方法實例

    本文我們來學習一下 Go 語言的擴展并發(fā)原語:SingleFlight,SingleFlight 的作用是將并發(fā)請求合并成一個請求,以減少重復的進程來優(yōu)化 Go 代碼
    2023-12-12
  • Golang函數(shù)重試機制實現(xiàn)代碼

    Golang函數(shù)重試機制實現(xiàn)代碼

    在編寫應用程序時,有時候會遇到一些短暫的錯誤,例如網(wǎng)絡請求、服務鏈接終端失敗等,這些錯誤可能導致函數(shù)執(zhí)行失敗,這篇文章主要介紹了Golang函數(shù)重試機制實現(xiàn)代碼,需要的朋友可以參考下
    2024-04-04
  • go實現(xiàn)thrift的網(wǎng)絡傳輸性能及需要注意問題示例解析

    go實現(xiàn)thrift的網(wǎng)絡傳輸性能及需要注意問題示例解析

    這篇文章主要為大家介紹了go實現(xiàn)thrift的網(wǎng)絡傳輸性能及需要注意問題示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-09-09

最新評論