一文詳解GO如何實現(xiàn)Redis的AOF持久化
GO實現(xiàn)Redis的AOF持久化
將用戶發(fā)來的指令以RESP協(xié)議的形式存儲在本地的AOF文件,重啟Redis后執(zhí)行此文件恢復(fù)數(shù)據(jù)
https://github.com/csgopher/go-redis
本文涉及以下文件:redis.conf:配置文件
aof:實現(xiàn)aof
redis.conf
appendonly yes
appendfilename appendonly.aof
aof/aof.go
type CmdLine = [][]byte
const (
aofQueueSize = 1 << 16
)
type payload struct {
cmdLine CmdLine
dbIndex int
}
type AofHandler struct {
db databaseface.Database
aofChan chan *payload
aofFile *os.File
aofFilename string
currentDB int
}
func NewAOFHandler(db databaseface.Database) (*AofHandler, error) {
handler := &AofHandler{}
handler.aofFilename = config.Properties.AppendFilename
handler.db = db
handler.LoadAof()
aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return nil, err
}
handler.aofFile = aofFile
handler.aofChan = make(chan *payload, aofQueueSize)
go func() {
handler.handleAof()
}()
return handler, nil
}
func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {
if config.Properties.AppendOnly && handler.aofChan != nil {
handler.aofChan <- &payload{
cmdLine: cmdLine,
dbIndex: dbIndex,
}
}
}
func (handler *AofHandler) handleAof() {
handler.currentDB = 0
for p := range handler.aofChan {
if p.dbIndex != handler.currentDB {
// select db
data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
_, err := handler.aofFile.Write(data)
if err != nil {
logger.Warn(err)
continue
}
handler.currentDB = p.dbIndex
}
data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()
_, err := handler.aofFile.Write(data)
if err != nil {
logger.Warn(err)
}
}
}
func (handler *AofHandler) LoadAof() {
file, err := os.Open(handler.aofFilename)
if err != nil {
logger.Warn(err)
return
}
defer file.Close()
ch := parser.ParseStream(file)
fakeConn := &connection.Connection{}
for p := range ch {
if p.Err != nil {
if p.Err == io.EOF {
break
}
logger.Error("parse error: " + p.Err.Error())
continue
}
if p.Data == nil {
logger.Error("empty payload")
continue
}
r, ok := p.Data.(*reply.MultiBulkReply)
if !ok {
logger.Error("require multi bulk reply")
continue
}
ret := handler.db.Exec(fakeConn, r.Args)
if reply.IsErrorReply(ret) {
logger.Error("exec err", err)
}
}
}
- AofHandler:1.從管道中接收數(shù)據(jù) 2.寫入AOF文件
- AddAof:用戶的指令包裝成payload放入管道
- handleAof:將管道中的payload寫入磁盤
- LoadAof:重啟Redis后加載aof文件
database/database.go
type Database struct {
dbSet []*DB
aofHandler *aof.AofHandler
}
func NewDatabase() *Database {
mdb := &Database{}
if config.Properties.Databases == 0 {
config.Properties.Databases = 16
}
mdb.dbSet = make([]*DB, config.Properties.Databases)
for i := range mdb.dbSet {
singleDB := makeDB()
singleDB.index = i
mdb.dbSet[i] = singleDB
}
if config.Properties.AppendOnly {
aofHandler, err := aof.NewAOFHandler(mdb)
if err != nil {
panic(err)
}
mdb.aofHandler = aofHandler
for _, db := range mdb.dbSet {
singleDB := db
singleDB.addAof = func(line CmdLine) {
mdb.aofHandler.AddAof(singleDB.index, line)
}
}
}
return mdb
}
將AOF加入到database里
使用singleDB的原因:因為在循環(huán)中獲取返回變量的地址都完全相同,因此當(dāng)我們想要訪問數(shù)組中元素所在的地址時,不應(yīng)該直接獲取 range 返回的變量地址 db,而應(yīng)該使用 singleDB := db
database/db.go
type DB struct {
index int
data dict.Dict
addAof func(CmdLine)
}
func makeDB() *DB {
db := &DB{
data: dict.MakeSyncDict(),
addAof: func(line CmdLine) {},
}
return db
}
由于分?jǐn)?shù)據(jù)庫db引用不到aof,所以添加一個addAof匿名函數(shù),在NewDatabase中用這個匿名函數(shù)調(diào)用AddAof
database/keys.go
func execDel(db *DB, args [][]byte) resp.Reply {
......
if deleted > 0 {
db.addAof(utils.ToCmdLine2("del", args...))
}
return reply.MakeIntReply(int64(deleted))
}
func execFlushDB(db *DB, args [][]byte) resp.Reply {
db.Flush()
db.addAof(utils.ToCmdLine2("flushdb", args...))
return &reply.OkReply{}
}
func execRename(db *DB, args [][]byte) resp.Reply {
......
db.addAof(utils.ToCmdLine2("rename", args...))
return &reply.OkReply{}
}
func execRenameNx(db *DB, args [][]byte) resp.Reply {
......
db.addAof(utils.ToCmdLine2("renamenx", args...))
return reply.MakeIntReply(1)
}
database/string.go
func execSet(db *DB, args [][]byte) resp.Reply {
......
db.addAof(utils.ToCmdLine2("set", args...))
return &reply.OkReply{}
}
func execSetNX(db *DB, args [][]byte) resp.Reply {
......
db.addAof(utils.ToCmdLine2("setnx", args...))
return reply.MakeIntReply(int64(result))
}
func execGetSet(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
value := args[1]
entity, exists := db.GetEntity(key)
db.PutEntity(key, &database.DataEntity{Data: value})
db.addAof(utils.ToCmdLine2("getset", args...))
......
}
添加addAof方法
測試命令
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n
*2\r\n$6\r\nSELECT\r\n$1\r\n1\r\n
到此這篇關(guān)于一文詳解GO如何實現(xiàn)Redis的AOF持久化的文章就介紹到這了,更多相關(guān)GO實現(xiàn)Redis AOF持久化內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
一文帶你掌握掌握 Golang結(jié)構(gòu)體與方法
在 Golang 中,結(jié)構(gòu)體和方法是實現(xiàn)面向?qū)ο缶幊痰闹匾M成部分,也是 Golang 的核心概念之一。在本篇文章中,我們將深入介紹 Golang 結(jié)構(gòu)體與方法的概念、使用方法以及相關(guān)的編程技巧和最佳實踐2023-04-04
Go語言函數(shù)的延遲調(diào)用(Deferred Code)詳解
本文將介紹Go語言函數(shù)和方法中的延遲調(diào)用,正如名稱一樣,這部分定義不會立即執(zhí)行,一般會在函數(shù)返回前再被調(diào)用,我們通過一些示例來了解一下延遲調(diào)用的使用場景2022-07-07
Go語言基本的語法和內(nèi)置數(shù)據(jù)類型初探
這篇文章主要介紹了Go語言基本的語法和內(nèi)置數(shù)據(jù)類型,是golang入門學(xué)習(xí)中的基礎(chǔ)知識,需要的朋友可以參考下2015-10-10
windows下使用vscode搭建golang環(huán)境并調(diào)試的過程
這篇文章主要介紹了在windows下使用vscode搭建golang環(huán)境并進(jìn)行調(diào)試,主要包括安裝方法及環(huán)境變量配置技巧,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-09-09

