Golang實現(xiàn)自己的Redis(pipeline客戶端)實例探索
引言
用11篇文章實現(xiàn)一個可用的Redis服務(wù),姑且叫EasyRedis吧,希望通過文章將Redis掰開撕碎了呈現(xiàn)給大家,而不是僅僅停留在八股文的層面,并且有非常爽的感覺,歡迎持續(xù)關(guān)注學(xué)習(xí)。
- [x] easyredis之TCP服務(wù)
- [x] easyredis之網(wǎng)絡(luò)請求序列化協(xié)議(RESP)
- [x] easyredis之內(nèi)存數(shù)據(jù)庫
- [x] easyredis之過期時間 (時間輪實現(xiàn))
- [x] easyredis之持久化 (AOF實現(xiàn))
- [x] easyredis之發(fā)布訂閱功能
- [x] easyredis之有序集合(跳表實現(xiàn))
- [x] easyredis之 pipeline 客戶端實現(xiàn)
- [ ] easyredis之事務(wù)(原子性/回滾)
- [ ] easyredis之連接池
- [ ] easyredis之分布式集群存儲
EasyRedis之pipeline客戶端
網(wǎng)絡(luò)編程的一個基礎(chǔ)知識:用同一個sokcet連接發(fā)送多個數(shù)據(jù)包的時候,我們一般的做法是,發(fā)送并立刻接收結(jié)果,在沒有接收到,是不會繼續(xù)發(fā)送數(shù)據(jù)包。這種方法簡單,但是效率太低。時間都浪費在等待上了...
socket的【發(fā)送緩沖區(qū)和接收緩沖區(qū)】是分離的,也就是發(fā)送不用等待接收,接收也不用等待發(fā)送。
所以我們可以把我們要發(fā)送的多個數(shù)據(jù)包【數(shù)據(jù)包1/數(shù)據(jù)包2...數(shù)據(jù)包N】復(fù)用同一個連接,通過發(fā)送緩沖區(qū)按順序都發(fā)送給服務(wù)端。服務(wù)端處理請求的順序,也是按照【數(shù)據(jù)包1/數(shù)據(jù)包2...數(shù)據(jù)包N】這個順序處理的。當(dāng)處理完以后,處理結(jié)果將按照【數(shù)據(jù)包結(jié)果1/數(shù)據(jù)包結(jié)果2...數(shù)據(jù)包結(jié)果N】順序發(fā)送給客戶端的接收緩沖區(qū)。客戶端只需要從接收緩沖區(qū)中讀取數(shù)據(jù),并保存到請求數(shù)據(jù)包上,即可。這樣我們就可以將發(fā)送和接收分離開來。一個協(xié)程只負(fù)責(zé)發(fā)送,一個協(xié)程只負(fù)責(zé)接收,互相不用等待。關(guān)鍵在于保證發(fā)送和接收的順序是相同的設(shè)計邏輯圖如下:

代碼路徑redis/client/client.go整個代碼也就是200多行,結(jié)合上圖非常容易理解
創(chuàng)建客戶端
type RedisClent struct {
// socket連接
conn net.Conn
addr string
// 客戶端當(dāng)前狀態(tài)
connStatus atomic.Int32
// heartbeat
ticker time.Ticker
// buffer cache
waitSend chan *request
waitResult chan *request
// 有請求正在處理中...
working sync.WaitGroup
}
// 創(chuàng)建redis客戶端socket
func NewRedisClient(addr string) (*RedisClent, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
returnnil, err
}
rc := RedisClent{}
rc.conn = conn
rc.waitSend = make(chan *request, maxChanSize)
rc.waitResult = make(chan *request, maxChanSize)
rc.addr = addr
return &rc, nil
}
// 啟動
func (rc *RedisClent) Start() error {
rc.ticker = *time.NewTicker(heartBeatInterval)
// 將waitSend緩沖區(qū)進行發(fā)送
go rc.execSend()
// 獲取服務(wù)端結(jié)果
go rc.execReceive()
// 定時發(fā)送心跳
go rc.execHeardBeat()
rc.connStatus.Store(connRunning) // 啟動狀態(tài)
returnnil
}發(fā)送Redis命令
將command [][]byte保存到緩沖區(qū) rc.waitSend中
// 將redis命令保存到 waitSend 中
func (rc *RedisClent) Send(command [][]byte) (protocol.Reply, error) {
// 已關(guān)閉
if rc.connStatus.Load() == connClosed {
returnnil, errors.New("client closed")
}
req := &request{
command: command,
wait: wait.Wait{},
}
// 單個請求
req.wait.Add(1)
// 所有請求
rc.working.Add(1)
defer rc.working.Done()
// 將數(shù)據(jù)保存到緩沖中
rc.waitSend <- req
// 等待處理結(jié)束
if req.wait.WaitWithTimeOut(maxWait) {
returnnil, errors.New("time out")
}
// 出錯
if req.err != nil {
err := req.err
returnnil, err
}
// 正常
return req.reply, nil
}發(fā)送Redis命令到服務(wù)端
// 將waitSend緩沖區(qū)進行發(fā)送
func (rc *RedisClent) execSend() {
for req := range rc.waitSend {
rc.sendReq(req)
}
}
func (rc *RedisClent) sendReq(req *request) {
// 無效請求
if req == nil || len(req.command) == 0 {
return
}
var err error
// 網(wǎng)絡(luò)請求(重試3次)
for i := 0; i < 3; i++ {
_, err = rc.conn.Write(req.Bytes())
// 發(fā)送成功 or 發(fā)送錯誤(除了超時錯誤和deadline錯誤)跳出
if err == nil ||
(!strings.Contains(err.Error(), "timeout") && // only retry timeout
!strings.Contains(err.Error(), "deadline exceeded")) {
break
}
}
if err == nil { // 發(fā)送成功,異步等待結(jié)果
rc.waitResult <- req
} else { // 發(fā)送失敗,請求直接失敗
req.err = err
req.wait.Done()
}
}從服務(wù)端讀取數(shù)據(jù)
func (rc *RedisClent) execReceive() {
ch := parser.ParseStream(rc.conn)
for payload := range ch {
if payload.Err != nil {
if rc.connStatus.Load() == connClosed { // 連接已關(guān)閉
return
}
// 否則,重新連接(可能因為網(wǎng)絡(luò)抖動臨時斷開了)
rc.reconnect()
return
}
// 說明一切正常
rc.handleResult(payload.Reply)
}
}
func (rc *RedisClent) handleResult(reply protocol.Reply) {
// 從rc.waitResult 獲取一個等待中的請求,將結(jié)果保存進去
req := <-rc.waitResult
if req == nil {
return
}
req.reply = reply
req.wait.Done() // 通知已經(jīng)獲取到結(jié)果
}斷線重連
因為網(wǎng)絡(luò)抖動可能存在連接斷開的情況,所以需要有重連的功能
func (rc *RedisClent) reconnect() {
logger.Info("redis client reconnect...")
rc.conn.Close()
var conn net.Conn
// 重連(重試3次)
for i := 0; i < 3; i++ {
var err error
conn, err = net.Dial("tcp", rc.addr)
if err != nil {
logger.Error("reconnect error: " + err.Error())
time.Sleep(time.Second)
continue
} else {
break
}
}
// 服務(wù)端連不上,說明服務(wù)可能掛了(or 網(wǎng)絡(luò)問題 and so on...)
if conn == nil {
rc.Stop()
return
}
// 這里關(guān)閉沒問題,因為rc.conn.Close已經(jīng)關(guān)閉,函數(shù)Send中保存的請求因為發(fā)送不成功,不會寫入到waitResult
close(rc.waitResult)
// 清理 waitResult(因為連接重置,新連接上只能處理新請求,老的請求的數(shù)據(jù)結(jié)果在老連接上,老連接已經(jīng)關(guān)了,新連接上肯定是沒有結(jié)果的)
for req := range rc.waitResult {
req.err = errors.New("connect reset")
req.wait.Done()
}
// 新連接(新氣象)
rc.waitResult = make(chan *request, maxWait)
rc.conn = conn
// 重新啟動接收協(xié)程
go rc.execReceive()
}項目代碼地址: https://github.com/gofish2020/easyredis
以上就是Golang實現(xiàn)自己的Redis(pipeline客戶端)實例探索的詳細(xì)內(nèi)容,更多關(guān)于Golang Redis pipeline客戶端的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go語言中Gin框架使用JWT實現(xiàn)登錄認(rèn)證的方案
在如今前后端分離開發(fā)的大環(huán)境中,我們需要解決一些登陸,后期身份認(rèn)證以及鑒權(quán)相關(guān)的事情,通常的方案就是采用請求頭攜帶token的方式進行實現(xiàn),本文給大家介紹了Go語言中Gin框架使用JWT實現(xiàn)登錄認(rèn)證的方案,需要的朋友可以參考下2024-11-11
golang復(fù)制文件夾移動到另一個文件夾實現(xiàn)方法詳解
這篇文章主要為大家介紹了golang復(fù)制文件夾并移動到另一個文件夾實現(xiàn)方法詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-07-07
使用Golong實現(xiàn)JWT身份驗證的詳細(xì)過程
JWT提供了一種強大而靈活的方法來處理Web應(yīng)用程序中的身份驗證和授權(quán),本教程將引導(dǎo)您逐步實現(xiàn)Go應(yīng)用程序中的JWT身份驗證過程,感興趣的朋友跟隨小編一起看看吧2024-03-03
Golang實現(xiàn)多存儲驅(qū)動設(shè)計SDK案例
這篇文章主要介紹了Golang實現(xiàn)多存儲驅(qū)動設(shè)計SDK案例,Gocache是一個基于Go語言編寫的多存儲驅(qū)動的緩存擴展組件,更多具體內(nèi)容感興趣的小伙伴可以參考一下2022-09-09
基于go-cqhttp與Flask搭建定制機器人項目實戰(zhàn)示例
這篇文章主要為大家介紹了基于go-cqhttp與Flask搭建定制機器人項目實戰(zhàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-11-11

