Golang?實現(xiàn)Redis?協(xié)議解析器的解決方案
本文是 《用 Golang 實現(xiàn)一個 Redis》系列文章第二篇,本文將分別介紹Redis 通信協(xié)議 以及 協(xié)議解析器 的實現(xiàn),若您對協(xié)議有所了解可以直接閱讀協(xié)議解析器部分。
Redis 通信協(xié)議
Redis 自 2.0 版本起使用了統(tǒng)一的協(xié)議 RESP (REdis Serialization Protocol),該協(xié)議易于實現(xiàn),計算機可以高效的進行解析且易于被人類讀懂。
RESP 是一個二進制安全的文本協(xié)議,工作于 TCP 協(xié)議上。RESP 以行作為單位,客戶端和服務(wù)器發(fā)送的命令或數(shù)據(jù)一律以 \r\n (CRLF)作為換行符。
二進制安全是指允許協(xié)議中出現(xiàn)任意字符而不會導致故障。比如 C 語言的字符串以 \0 作為結(jié)尾不允許字符串中間出現(xiàn)\0, 而 Go 語言的 string 則允許出現(xiàn) \0,我們說 Go 語言的 string 是二進制安全的,而 C 語言字符串不是二進制安全的。
RESP 的二進制安全性允許我們在 key 或者 value 中包含 \r 或者 \n 這樣的特殊字符。在使用 redis 存儲 protobuf、msgpack 等二進制數(shù)據(jù)時,二進制安全性尤為重要。
RESP 定義了5種格式:
- 簡單字符串(Simple String): 服務(wù)器用來返回簡單的結(jié)果,比如"OK"。非二進制安全,且不允許換行。
- 錯誤信息(Error): 服務(wù)器用來返回簡單的錯誤信息,比如"ERR Invalid Synatx"。非二進制安全,且不允許換行。
- 整數(shù)(Integer): llen、scard 等命令的返回值, 64位有符號整數(shù)
- 字符串(Bulk String): 二進制安全字符串, 比如 get 等命令的返回值
- 數(shù)組(Array, 又稱 Multi Bulk Strings): Bulk String 數(shù)組,客戶端發(fā)送指令以及 lrange 等命令響應的格式
RESP 通過第一個字符來表示格式:
- 簡單字符串:以"+" 開始, 如:"+OK\r\n"
- 錯誤:以"-" 開始,如:"-ERR Invalid Synatx\r\n"
- 整數(shù):以":"開始,如:":1\r\n"
- 字符串:以
$開始 - 數(shù)組:以
*開始
Bulk String有兩行,第一行為 $+正文長度,第二行為實際內(nèi)容。如:
$3\r\nSET\r\n
Bulk String 是二進制安全的可以包含任意字節(jié),就是說可以在 Bulk String 內(nèi)部包含 "\r\n" 字符(行尾的CRLF被隱藏):
$4a\r\nb
$-1 表示 nil, 比如使用 get 命令查詢一個不存在的key時,響應即為$-1。
Array 格式第一行為 "*"+數(shù)組長度,其后是相應數(shù)量的 Bulk String。如, ["foo", "bar"]的報文:
*2 $3 foo $3 bar
客戶端也使用 Array 格式向服務(wù)端發(fā)送指令。命令本身將作為第一個參數(shù),如 SET key value指令的RESP報文:
*3 $3 SET $3 key $5 value
將換行符打印出來:
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
協(xié)議解析器
我們在 實現(xiàn)TCP服務(wù)器 一文中已經(jīng)介紹過TCP服務(wù)器的實現(xiàn),協(xié)議解析器將實現(xiàn)其 Handler 接口充當應用層服務(wù)器。
協(xié)議解析器將接收 Socket 傳來的數(shù)據(jù),并將其數(shù)據(jù)還原為 [][]byte 格式,如 "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\value\r\n" 將被還原為 ['SET', 'key', 'value']。
本文完整代碼: github.com/hdt3213/godis/redis/parser
來自客戶端的請求均為數(shù)組格式,它在第一行中標記報文的總行數(shù)并使用CRLF作為分行符。
bufio 標準庫可以將從 reader 讀到的數(shù)據(jù)緩存到 buffer 中,直至遇到分隔符或讀取完畢后返回,所以我們使用 reader.ReadBytes('\n') 來保證每次讀取到完整的一行。
需要注意的是RESP是二進制安全的協(xié)議,它允許在正文中使用CRLF字符。舉例來說 Redis 可以正確接收并執(zhí)行SET "a\r\nb" 1指令, 這條指令的正確報文是這樣的:
*3 $3 SET $4 a\r\nb $7 myvalue
當 ReadBytes 讀取到第五行 "a\r\nb\r\n"時會將其誤認為兩行:
*3 $3 SET $4 a // 錯誤的分行 b // 錯誤的分行 $7 myvalue
因此當讀取到第四行$4后, 不應該繼續(xù)使用 ReadBytes('\n') 讀取下一行, 應使用 io.ReadFull(reader, msg) 方法來讀取指定長度的內(nèi)容。
msg = make([]byte, 4 + 2) // 正文長度4 + 換行符長度2 _, err = io.ReadFull(reader, msg)
首先我們來定義解析器的接口:
// Payload stores redis.Reply or error
type Payload struct {
Data redis.Reply
Err error
}
// ParseStream 通過 io.Reader 讀取數(shù)據(jù)并將結(jié)果通過 channel 將結(jié)果返回給調(diào)用者
// 流式處理的接口適合供客戶端/服務(wù)端使用
func ParseStream(reader io.Reader) <-chan *Payload {
ch := make(chan *Payload)
go parse0(reader, ch)
return ch
}
// ParseOne 解析 []byte 并返回 redis.Reply
func ParseOne(data []byte) (redis.Reply, error) {
ch := make(chan *Payload)
reader := bytes.NewReader(data)
go parse0(reader, ch)
payload := <-ch // parse0 will close the channel
if payload == nil {
return nil, errors.New("no reply")
}
return payload.Data, payload.Err
}接下來我們可以看一下解析器核心流程的偽代碼,您可以在parser.go看到完整代碼:
func parse0(reader io.Reader, ch chan<- *Payload) {
// 初始化讀取狀態(tài)
readingMultiLine := false
expectedArgsCount := 0
var args [][]byte
var bulkLen int64
for {
// 上文中我們提到 RESP 是以行為單位的
// 因為行分為簡單字符串和二進制安全的BulkString,我們需要封裝一個 readLine 函數(shù)來兼容
line, err = readLine(reader, bulkLen)
if err != nil {
// 處理錯誤
return
}
// 接下來我們對剛剛讀取的行進行解析
// 我們簡單的將 Reply 分為兩類:
// 單行: StatusReply, IntReply, ErrorReply
// 多行: BulkReply, MultiBulkReply
if !readingMultiLine {
if isMulitBulkHeader(line) {
// 我們收到了 MulitBulkReply 的第一行
// 獲得 MulitBulkReply 中 BulkString 的個數(shù)
expectedArgsCount = parseMulitBulkHeader(line)
// 等待 MulitBulkReply 后續(xù)行
readingMultiLine = true
} else if isBulkHeader(line) {
// 我們收到了 BulkReply 的第一行
// 獲得 BulkReply 第二行的長度, 通過 bulkLen 告訴 readLine 函數(shù)下一行 BulkString 的長度
bulkLen = parseBulkHeader()
// 這個 Reply 中一共有 1 個 BulkString
expectedArgsCount = 1
// 等待 BulkReply 后續(xù)行
readingMultiLine = true
} else {
// 處理 StatusReply, IntReply, ErrorReply 等單行 Reply
reply := parseSingleLineReply(line)
// 通過 ch 返回結(jié)果
emitReply(ch)
}
} else {
// 進入此分支說明我們正在等待 MulitBulkReply 或 BulkReply 的后續(xù)行
// MulitBulkReply 的后續(xù)行有兩種,BulkHeader 或者 BulkString
if isBulkHeader(line) {
bulkLen = parseBulkHeader()
} else {
// 我們正在讀取一個 BulkString, 它可能是 MulitBulkReply 或 BulkReply
args = append(args, line)
}
if len(args) == expectedArgsCount { // 我們已經(jīng)讀取了所有后續(xù)行
// 通過 ch 返回結(jié)果
emitReply(ch)
// 重置狀態(tài), 準備解析下一條 Reply
readingMultiLine = false
expectedArgsCount = 0
args = nil
bulkLen = 0
}
}
}
}貼一下工具函數(shù)的實現(xiàn):
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
var msg []byte
var err error
if state.bulkLen == 0 { // read simple line
msg, err = bufReader.ReadBytes('\n')
if err != nil {
return nil, true, err
}
if len(msg) == 0 || msg[len(msg)-2] != '\r' {
return nil, false, errors.New("protocol error: " + string(msg))
}
} else { // read bulk line (binary safe)
msg = make([]byte, state.bulkLen+2)
_, err = io.ReadFull(bufReader, msg)
if err != nil {
return nil, true, err
}
if len(msg) == 0 ||
msg[len(msg)-2] != '\r' ||
msg[len(msg)-1] != '\n' {
return nil, false, errors.New("protocol error: " + string(msg))
}
state.bulkLen = 0
}
return msg, false, nil
}
func parseMultiBulkHeader(msg []byte, state *readState) error {
var err error
var expectedLine uint64
expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
if err != nil {
return errors.New("protocol error: " + string(msg))
}
if expectedLine == 0 {
state.expectedArgsCount = 0
return nil
} else if expectedLine > 0 {
// first line of multi bulk reply
state.msgType = msg[0]
state.readingMultiLine = true
state.expectedArgsCount = int(expectedLine)
state.args = make([][]byte, 0, expectedLine)
return nil
} else {
return errors.New("protocol error: " + string(msg))
}
}
func parseBulkHeader(msg []byte, state *readState) error {
var err error
state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
if err != nil {
return errors.New("protocol error: " + string(msg))
}
if state.bulkLen == -1 { // null bulk
return nil
} else if state.bulkLen > 0 {
state.msgType = msg[0]
state.readingMultiLine = true
state.expectedArgsCount = 1
state.args = make([][]byte, 0, 1)
return nil
} else {
return errors.New("protocol error: " + string(msg))
}
}
func parseSingleLineReply(msg []byte) (redis.Reply, error) {
str := strings.TrimSuffix(string(msg), "\n")
str = strings.TrimSuffix(str, "\r")
var result redis.Reply
switch msg[0] {
case '+': // status reply
result = reply.MakeStatusReply(str[1:])
case '-': // err reply
result = reply.MakeErrReply(str[1:])
case ':': // int reply
val, err := strconv.ParseInt(str[1:], 10, 64)
if err != nil {
return nil, errors.New("protocol error: " + string(msg))
}
result = reply.MakeIntReply(val)
default:
// parse as text protocol
strs := strings.Split(str, " ")
args := make([][]byte, len(strs))
for i, s := range strs {
args[i] = []byte(s)
}
result = reply.MakeMultiBulkReply(args)
}
return result, nil
}到此這篇關(guān)于Golang 實現(xiàn) Redis 協(xié)議解析器的文章就介紹到這了,更多相關(guān)go redis 協(xié)議解析器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺談Go語言多態(tài)的實現(xiàn)與interface使用
如果大家系統(tǒng)的學過C++、Java等語言以及面向?qū)ο蟮脑?,相信應該對多態(tài)不會陌生。多態(tài)是面向?qū)ο蠓懂牣斨薪?jīng)常使用并且非常好用的一個功能,它主要是用在強類型語言當中,像是Python這樣的弱類型語言,變量的類型可以隨意變化,也沒有任何限制,其實區(qū)別不是很大2021-06-06

