亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

使用go語(yǔ)言實(shí)現(xiàn)Redis持久化的示例代碼

 更新時(shí)間:2024年07月14日 08:26:11   作者:uccs  
redis 是一個(gè)內(nèi)存數(shù)據(jù)庫(kù),如果你把進(jìn)程殺掉,那么里面存儲(chǔ)的數(shù)據(jù)都會(huì)消失,那么這篇文章就是來(lái)解決 redis 持久化的問(wèn)題,本文給大家介紹了使用go語(yǔ)言實(shí)現(xiàn)Redis持久化,需要的朋友可以參考下

redis 是一個(gè)內(nèi)存數(shù)據(jù)庫(kù),如果你把進(jìn)程殺掉,那么里面存儲(chǔ)的數(shù)據(jù)都會(huì)消失,那么這篇文章就是來(lái)解決 redis 持久化的問(wèn)題

我們?cè)?nbsp;redis.conf 文件中增加兩個(gè)配置

appendonly yes
appendfilename appendonly.aof
  • appenonly 表示只追加
  • appendfilename 表示追加到那什么文件中

指令: *3\r\n$3\r\nSET\r\n$3\r\nKEY\r\n$5\r\nvalue\r\n 落在 appendonly.aof 文件中

*3
$3
SET
$3
KEY
$5
value

這里要實(shí)現(xiàn)的功能就是把用戶(hù)發(fā)過(guò)來(lái)的指令,用 RESP 的形式記錄在 appendonly.aof 文件中

這個(gè)文件是在機(jī)器的硬盤(pán)上,當(dāng) redis 停了之后,內(nèi)存中的數(shù)據(jù)都沒(méi)了,但這個(gè)文件會(huì)保存下

redis 重啟后,會(huì)讀取這個(gè)文件,把之前內(nèi)存中的數(shù)據(jù)再次加載回來(lái)

定義 AofHandler

在項(xiàng)目下新建文件 aof/aof.go

在里面定義一個(gè) AofHandler 結(jié)構(gòu)體,它的作用就是用來(lái)處理 appendonly.aof 文件

type AofHandler struct {
	database    databaseface.Database // 持有 db,db 有業(yè)務(wù)核心
	aofFile     *os.File              // 持有 aof 文件
	aofFilename string                // aof 文件名
	currentDB   int                   // 當(dāng)前 db
	aofChan     chan *payload         // 寫(xiě)文件的緩沖區(qū)
}

這里有注意的是 aofChan,它是寫(xiě)文件的緩沖區(qū)

因?yàn)閺奈募凶x取指令,指令是非常密集的,但是將指令寫(xiě)入硬盤(pán)時(shí)非常慢的,我們又不可能每次都等待指令寫(xiě)完成后再去操作 redis

這時(shí)我們就把所有想寫(xiě)入 aof 文件的指令放到 aofChan 中,然后在另一個(gè) goroutine 中去寫(xiě)入硬盤(pán)

所以這個(gè) aofChan 的類(lèi)型是 payload 結(jié)構(gòu)體

type CmdLine = [][]byte
type payload struct {
  cmdLine CmdLine // 指令
  dbIndex int     // db 索引
}

AofHandler 結(jié)構(gòu)體定義好之后,我們需要定義一個(gè) NewAofHandler 函數(shù)來(lái)初始化 AofHandler 結(jié)構(gòu)體

還需要定義一個(gè) AddAof 方法,用來(lái)往 aofChan 中添加指令

放到緩沖區(qū)之后,還需要一個(gè)方法 HandleAof 將指令寫(xiě)入硬盤(pán)

最后還要實(shí)現(xiàn)一個(gè)從硬盤(pán)加載 aof 文件到內(nèi)存的的函數(shù) LoadAof

實(shí)現(xiàn) NewAofHandler

NewAofHandler 函數(shù)用來(lái)初始化 AofHandler 結(jié)構(gòu)體

func NewAofHandler(database databaseface.Database) (*AofHandler, error) {
  // 初始化 AofHandler 結(jié)構(gòu)體
  handler := &AofHandler{}
  // 從配置文件中讀取 aof 文件名
  handler.aofFilename = config.Properties.AppendFilename
  // 持有 db
  handler.database = database
  // 從硬盤(pán)加載 aof 文件
  handler.LoadAof()
  // 打開(kāi) aof 文件, 如果不存在則創(chuàng)建
  aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
  if err != nil {
    return nil, err
  }
  // 持有 aof 文件
  handler.aofFile = aofFile
  // 初始化 aofChan
  handler.aofChan = make(chan *payload, aofBufferSize)
  // 啟動(dòng)一個(gè) goroutine 處理 aofChan
  go func() {
    handler.HandleAof()
  }()
  // 返回 AofHandler 結(jié)構(gòu)體
  return handler, nil
}

實(shí)現(xiàn) AddAof

AddAof 方法用來(lái)往 aofChan 中添加指令,它不做落盤(pán)的操作

因?yàn)樵趫?zhí)行指令的時(shí)候,等待它落盤(pán)的話,效率太低了,所以我們把指令放到 aofChan 中,然后在另一個(gè) goroutine 中去處理

func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {
  // 如果配置文件中的 appendonly 為 true 并且 aofChan 不為 nil
  if config.Properties.AppendOnly && handler.aofChan != nil {
    // 往 aofChan 中添加指令
    handler.aofChan <- &payload{
      cmdLine: cmdLine,
      dbIndex: dbIndex,
    }
  }
}

實(shí)現(xiàn) HandleAof

HandleAof 方法用來(lái)處理 aofChan 中的指令,將指令寫(xiě)入硬盤(pán)

currentDB 記錄的是當(dāng)前工作的 DB,如果切換了 DB,會(huì)在 aof 文件中插入 select 0 這樣切換 DB 的語(yǔ)句

func (handler *AofHandler) HandleAof() {
  // 初始化 currentDB
  handler.currentDB = 0
  // 遍歷 chan
  for p := range handler.aofChan {
    // 如果當(dāng)前 db 不等于上一次工作的 db,就要插入一條 select 語(yǔ)句
    if p.dbIndex != handler.currentDB {
      // 我們要把 select 0 編碼成 RESP 格式
      // 也就是 *2\r\n$6\r\nSELECT\r\n$1\r\n0\r\n
      data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
      // 寫(xiě)入 aof 文件
      _, err := handler.aofFile.Write(data)
      if err != nil {
        logger.Warn(err)
        continue
      }
      // 更新 currentDB
      handler.currentDB = p.dbIndex
    }
    // 這里是插入正常的指令
    data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()
    // 寫(xiě)入 aof 文件
    _, err := handler.aofFile.Write(data)
    if err != nil {
      logger.Warn(err)
    }
  }
}

實(shí)現(xiàn) Aof 落盤(pán)功能

我們之前在實(shí)現(xiàn)指令的部分,都是直接執(zhí)行指令,現(xiàn)在我們要把指令寫(xiě)入 aof 文件

我們?cè)?nbsp;StandaloneDatabase 結(jié)構(gòu)體中增加一個(gè) aofHandler 字段

type StandaloneDatabase struct {
  dbSet      []*DB
  aofHandler *aof.AofHandler // 增加落盤(pán)功能
}

然后新建 database 時(shí)需要對(duì) aofHandler 進(jìn)行初始化

func NewStandaloneDatabase() *StandaloneDatabase {
  // ...
  // 先看下配置文件中的 appendonly 是否為 true
  if config.Properties.AppendOnly {
    // 初始化 aofHandler
    aofHandler, err := aof.NewAofHandler(database)
    if err != nil {
      panic(err)
    }
    // 持有 aofHandler
    database.aofHandler = aofHandler
    // 遍歷 dbSet
    for _, db := range database.dbSet {
      // 解決閉包問(wèn)題
      sdb := db
      // 為每個(gè) db 添加 AddAof 方法
      // 這個(gè) addAof 方法是在執(zhí)行指令的時(shí)候調(diào)用的
      sdb.addAof = func(line CmdLine) {
        database.aofHandler.AddAof(sdb.index, line)
      }
    }
  }
  return database
}

這里要注意的是 addAof 方法,它是在執(zhí)行指令的時(shí)候調(diào)用的

因?yàn)槲覀冃枰谥噶钪姓{(diào)用 Addaof 函數(shù),實(shí)現(xiàn)指令寫(xiě)入 aof 文件

但是在指令中,???們只能拿到 db,db 上又沒(méi)有操作 aof 相關(guān)的方法,所以我們需要在 db 中增加一個(gè) addAof 方法

type DB struct {
	index  int           // 數(shù)據(jù)的編號(hào)
	data   dict.Dict     // 數(shù)據(jù)類(lèi)型
	addAof func(CmdLine) // 每個(gè) db 都有一個(gè) addAof 方法
}

然后就在需要落盤(pán)的指令中調(diào)用 addAof 方法

DEL 方法需要記錄下來(lái),因?yàn)?nbsp;DEL 方法是刪除數(shù)據(jù)的,如果不記錄下來(lái),那么 aof 文件中的數(shù)據(jù)就會(huì)和內(nèi)存中的數(shù)據(jù)不一致

// DEL K1 K2 K3
func DEL(db *DB, args [][]byte) resp.Reply {
  deleted := db.Removes(keys...)
  // delete 大于 0 說(shuō)明有數(shù)據(jù)被刪除
  if deleted > 0 {
    db.addAof(utils.ToCmdLine2("DEL", args...))
  }
}

FLUSHDB 方法也需要記錄下來(lái),因?yàn)?nbsp;FLUSHDB 方法是刪除當(dāng)前 DB 中的所有數(shù)據(jù)

// FLUSHDB
func FLUSHDB(db *DB, args [][]byte) resp.Reply {
	db.addAof(utils.ToCmdLine2("FLUSEHDB", args...))
}

RENAME 和 RENAMENX 方法也需要記錄下來(lái),因?yàn)檫@兩個(gè)方法是修改 key 的名字

// RENAME K1 K2
func RENAME(db *DB, args [][]byte) resp.Reply {
  db.addAof(utils.ToCmdLine2("RENAME", args...))
}

// RENAMENX K1 K2
func RENAMENX(db *DB, args [][]byte) resp.Reply {
  db.addAof(utils.ToCmdLine2("RENAMENX", args...))
}

SET 和 SETNX 方法也需要記錄下來(lái),因?yàn)檫@兩個(gè)方法是設(shè)置數(shù)據(jù)的

// SET K1 v
func SET(db *DB, args [][]byte) resp.Reply {
  db.addAof(utils.ToCmdLine2("SET", args...))
}

// SETNX K1 v
func SETNX(db *DB, args [][]byte) resp.Reply {
  db.addAof(utils.ToCmdLine2("SETNX", args...))
}

GETSET 方法也需要記錄下來(lái),因?yàn)檫@個(gè)方法是設(shè)置數(shù)據(jù)的同時(shí)返回舊數(shù)據(jù)

// GETSET K1 v1
func GETSET(db *DB, args [][]byte) resp.Reply {
  db.addAof(utils.ToCmdLine2("GETSET", args...))
}

實(shí)現(xiàn) LoadAof

LoadAof 方法用來(lái)從硬盤(pán)加載 aof 文件到內(nèi)存

aof 中的指令是符合 RESP 協(xié)議的,我們就可以把這些指令當(dāng)成用戶(hù)發(fā)過(guò)來(lái)的指令,執(zhí)行就可以了

func (handler *AofHandler) LoadAof() {
  // 打開(kāi) aof 文件
  file, err := os.Open(handler.aofFilename)
  if err != nil {
    logger.Error(err)
    return
  }
  // 關(guān)閉文件
  defer func() {
    _ = file.Close()
  }()
  // 創(chuàng)建一個(gè) RESP 解析器,將 file 傳入,解析后的指令會(huì)放到 chan 中
  ch := parser.ParseStream(file)
  fackConn := &connection.Connection{}
  // 遍歷 chan,執(zhí)行指令
  for p := range ch {
    if p.Err != nil {
      // 如果是 EOF,說(shuō)明文件讀取完畢
      if p.Err == io.EOF {
        break
      }
      logger.Error(err)
      continue
    }
    if p.Data == nil {
      logger.Error("empty payload")
      continue
    }
    // 將指令轉(zhuǎn)換成 MultiBulkReply 類(lèi)型
    r, ok := p.Data.(*reply.MultiBulkReply)
    if !ok {
      logger.Error("exec multi mulk")
      continue
    }
    // 執(zhí)行指令
    rep := handler.database.Exec(fackConn, r.Args)
    if reply.IsErrReply(rep) {
      logger.Error(rep)
    }
  }
}

到此這篇關(guān)于使用go語(yǔ)言實(shí)現(xiàn)Redis持久化的示例代碼的文章就介紹到這了,更多相關(guān)go實(shí)現(xiàn)Redis持久化內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論