Golang?編寫Tcp服務(wù)器的解決方案
Golang 開發(fā) Tcp 服務(wù)器及拆包粘包、優(yōu)雅關(guān)閉的解決方案
Golang 作為廣泛用于服務(wù)端和云計算領(lǐng)域的編程語言,tcp socket 是其中至關(guān)重要的功能。您可以在 github.com/hdt3213/godis/tcp 中看到本文所述 TCP 服務(wù)器的完整代碼及其應(yīng)用。
早期的 Tomcat/Apache 服務(wù)器使用的是阻塞 IO 模型。它使用一個線程處理一個連接,在沒有收到新數(shù)據(jù)時監(jiān)聽線程處于阻塞狀態(tài),直到數(shù)據(jù)就緒后線程被喚醒。因為阻塞 IO 模型需要開啟大量線程并且頻繁地進行上下文切換,所以效率很差。
IO 多路復(fù)用技術(shù)為了解決上述問題采用了一個線程監(jiān)聽多路連接的方案。一個線程持有多個連接并阻塞等待,當(dāng)其中某個連接可讀寫時線程被喚醒進行處理。因為多個連接復(fù)用了一個線程所以 IO 多路復(fù)用需要的線程數(shù)少很多。
主流操作系統(tǒng)都提供了IO多路復(fù)用技術(shù)的實現(xiàn),比如 Linux上的 epoll,freeBSD 上的 kqueue 以及 Windows 平臺上的 iocp。有得必有失,因為 epoll 等技術(shù)提供的接口面向 IO 事件而非面向連接,所以需要編寫復(fù)雜的異步代碼,開發(fā)難度很大。
Golang 的 netpoller
基于IO多路復(fù)用和 goroutine scheduler 構(gòu)建了一個簡潔高性能的網(wǎng)絡(luò)模型,并給開發(fā)者提供了 goroutine-per-connection
風(fēng)格的極簡接口。
更多關(guān)于 netpoller
的剖析可以參考Golang實現(xiàn)四種負載均衡的算法(隨機,輪詢等), 接下來我們嘗試用 netpoller
編寫我們的服務(wù)器。
Echo 服務(wù)器
作為開始我們來實現(xiàn)一個簡單的 Echo 服務(wù)器。它會接受客戶端連接并將客戶端發(fā)送的內(nèi)容原樣傳回客戶端。
package main import ( "fmt" "net" "io" "log" "bufio" ) func ListenAndServe(address string) { // 綁定監(jiān)聽地址 listener, err := net.Listen("tcp", address) if err != nil { log.Fatal(fmt.Sprintf("listen err: %v", err)) } defer listener.Close() log.Println(fmt.Sprintf("bind: %s, start listening...", address)) for { // Accept 會一直阻塞直到有新的連接建立或者listen中斷才會返回 conn, err := listener.Accept() if err != nil { // 通常是由于listener被關(guān)閉無法繼續(xù)監(jiān)聽導(dǎo)致的錯誤 log.Fatal(fmt.Sprintf("accept err: %v", err)) } // 開啟新的 goroutine 處理該連接 go Handle(conn) } } func Handle(conn net.Conn) { // 使用 bufio 標(biāo)準(zhǔn)庫提供的緩沖區(qū)功能 reader := bufio.NewReader(conn) for { // ReadString 會一直阻塞直到遇到分隔符 '\n' // 遇到分隔符后會返回上次遇到分隔符或連接建立后收到的所有數(shù)據(jù), 包括分隔符本身 // 若在遇到分隔符之前遇到異常, ReadString 會返回已收到的數(shù)據(jù)和錯誤信息 msg, err := reader.ReadString('\n') if err != nil { // 通常遇到的錯誤是連接中斷或被關(guān)閉,用io.EOF表示 if err == io.EOF { log.Println("connection close") } else { log.Println(err) } return } b := []byte(msg) // 將收到的信息發(fā)送給客戶端 conn.Write(b) } } func main() { ListenAndServe(":8000") }
使用 telnet 工具測試我們編寫的 Echo 服務(wù)器:
$ telnet 127.0.0.1 8000 Trying 127.0.0.1... Connected to 127.0.0.1. Escape character is '^]'. > a a > b b Connection closed by foreign host.
拆包與粘包問題
某些朋友可能看到"拆包與粘包"后表示極度震驚,并再三強調(diào): TCP是個字節(jié)流協(xié)議,不存在粘包問題。
我們常說的 TCP 服務(wù)器并非「實現(xiàn) TCP 協(xié)議的服務(wù)器」而是「基于TCP協(xié)議的應(yīng)用層服務(wù)器」。TCP 是面向字節(jié)流的協(xié)議,而應(yīng)用層協(xié)議大多是面向消息的,比如 HTTP 協(xié)議的請求/響應(yīng),Redis 協(xié)議的指令/回復(fù)都是以消息為單位進行通信的。
作為應(yīng)用層服務(wù)器我們有責(zé)任從 TCP 提供的字節(jié)流中正確地解析出應(yīng)用層消息,在這一步驟中我們會遇到「拆包/粘包」問題。
socket 允許我們通過 read 函數(shù)讀取新收到的一段數(shù)據(jù)(當(dāng)然這段數(shù)據(jù)并不對應(yīng)一個 TCP 包)。在上文的 Echo 服務(wù)器示例中我們用\n
表示消息結(jié)束,從 read 函數(shù)讀取的數(shù)據(jù)可能存在下列幾種情況:
- 收到兩段數(shù)據(jù): "abc", "def\n" 它們屬于一條消息 "abcdef\n" 這是拆包的情況
- 收到一段數(shù)據(jù): "abc\ndef\n" 它們屬于兩條消息 "abc\n", "def\n" 這是粘包的情況
應(yīng)用層協(xié)議通常采用下列幾種思路之一來定義消息,以保證完整地進行讀取:
- 定長消息
- 在消息尾部添加特殊分隔符,如示例中的Echo協(xié)議和FTP控制協(xié)議。bufio 標(biāo)準(zhǔn)庫會緩存收到的數(shù)據(jù)直到遇到分隔符才會返回,它可以幫助我們正確地分割字節(jié)流。
- 將消息分為 header 和 body, 并在 header 中提供 body 總長度,這種分包方式被稱為 LTV(length,type,value) 包。這是應(yīng)用最廣泛的策略,如HTTP協(xié)議。當(dāng)從 header 中獲得 body 長度后, io.ReadFull 函數(shù)會讀取指定長度字節(jié)流,從而解析應(yīng)用層消息。
在沒有具體應(yīng)用層協(xié)議的情況下,我們很難詳細地討論拆包與粘包問題。在本系列的第二篇文章: 實現(xiàn) Redis 協(xié)議解析器 中我們可以看到 Redis 序列化協(xié)議(RESP)對分隔符和 LTV 包的結(jié)合應(yīng)用,以及兩種分包方式的具體解析代碼。
優(yōu)雅關(guān)閉
在生產(chǎn)環(huán)境下需要保證TCP服務(wù)器關(guān)閉前完成必要的清理工作,包括將完成正在進行的數(shù)據(jù)傳輸,關(guān)閉TCP連接等。這種關(guān)閉模式稱為優(yōu)雅關(guān)閉,可以避免資源泄露以及客戶端未收到完整數(shù)據(jù)導(dǎo)致故障。
TCP 服務(wù)器的優(yōu)雅關(guān)閉模式通常為: 先關(guān)閉listener阻止新連接進入,然后遍歷所有連接逐個進行關(guān)閉。首先修改一下TCP服務(wù)器:
// handler 是應(yīng)用層服務(wù)器的抽象 type Handler interface { Handle(ctx context.Context, conn net.Conn) Close()error } // 監(jiān)聽并提供服務(wù),并在收到 closeChan 發(fā)來的關(guān)閉通知后關(guān)閉 func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan struct{}) { // 監(jiān)聽關(guān)閉通知 go func() { <-closeChan logger.Info("shutting down...") // 停止監(jiān)聽,listener.Accept()會立即返回 io.EOF _ = listener.Close() // 關(guān)閉應(yīng)用層服務(wù)器 _ = handler.Close() }() // 在異常退出后釋放資源 defer func() { // close during unexpected error _ = listener.Close() _ = handler.Close() }() ctx := context.Background() var waitDone sync.WaitGroup for { // 監(jiān)聽端口, 阻塞直到收到新連接或者出現(xiàn)錯誤 conn, err := listener.Accept() if err != nil { break } // 開啟 goroutine 來處理新連接 logger.Info("accept link") waitDone.Add(1) go func() { defer func() { waitDone.Done() }() handler.Handle(ctx, conn) }() } waitDone.Wait() } // ListenAndServeWithSignal 監(jiān)聽中斷信號并通過 closeChan 通知服務(wù)器關(guān)閉 func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error { closeChan := make(chan struct{}) sigCh := make(chan os.Signal) signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) go func() { sig := <-sigCh switch sig { case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT: closeChan <- struct{}{} } }() listener, err := net.Listen("tcp", cfg.Address) if err != nil { return err } logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address)) ListenAndServe(listener, handler, closeChan) return nil }
接下來修改應(yīng)用層服務(wù)器:
// 客戶端連接的抽象 type Client struct { // tcp 連接 Conn net.Conn // 當(dāng)服務(wù)端開始發(fā)送數(shù)據(jù)時進入waiting, 阻止其它goroutine關(guān)閉連接 // wait.Wait是作者編寫的帶有最大等待時間的封裝: // https://github.com/HDT3213/godis/blob/master/src/lib/sync/wait/wait.go Waiting wait.Wait } type EchoHandler struct { // 保存所有工作狀態(tài)client的集合(把map當(dāng)set用) // 需使用并發(fā)安全的容器 activeConn sync.Map // 關(guān)閉狀態(tài)標(biāo)識位 closing atomic.AtomicBool } func MakeEchoHandler()(*EchoHandler) { return &EchoHandler{} } func (h *EchoHandler)Handle(ctx context.Context, conn net.Conn) { // 關(guān)閉中的 handler 不會處理新連接 if h.closing.Get() { conn.Close() return } client := &Client { Conn: conn, } h.activeConn.Store(client, struct{}{}) // 記住仍然存活的連接 reader := bufio.NewReader(conn) for { msg, err := reader.ReadString('\n') if err != nil { if err == io.EOF { logger.Info("connection close") h.activeConn.Delete(client) } else { logger.Warn(err) } return } // 發(fā)送數(shù)據(jù)前先置為waiting狀態(tài),阻止連接被關(guān)閉 client.Waiting.Add(1) // 模擬關(guān)閉時未完成發(fā)送的情況 //logger.Info("sleeping") //time.Sleep(10 * time.Second) b := []byte(msg) conn.Write(b) // 發(fā)送完畢, 結(jié)束waiting client.Waiting.Done() } } // 關(guān)閉客戶端連接 func (c *Client)Close()error { // 等待數(shù)據(jù)發(fā)送完成或超時 c.Waiting.WaitWithTimeout(10 * time.Second) c.Conn.Close() return nil } // 關(guān)閉服務(wù)器 func (h *EchoHandler)Close()error { logger.Info("handler shutting down...") h.closing.Set(true) // 逐個關(guān)閉連接 h.activeConn.Range(func(key interface{}, val interface{})bool { client := key.(*Client) client.Close() return true }) return nil }
到此這篇關(guān)于Golang 編寫Tcp服務(wù)器的解決方案的文章就介紹到這了,更多相關(guān)go tcp服務(wù)器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
go語言VScode?see?'go?help?modules'?(exit?statu
最近上手學(xué)習(xí)go語言,準(zhǔn)備在VSCode上寫程序的時候卻發(fā)現(xiàn)出了一點問題,下面這篇文章主要給大家介紹了關(guān)于go語言VScode?see?'go?help?modules'(exit?status?1)問題的解決過程,需要的朋友可以參考下2022-07-07解決golang post文件時Content-Type出現(xiàn)的問題
這篇文章主要介紹了解決golang post文件時Content-Type出現(xiàn)的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-05-05Golang優(yōu)雅關(guān)閉channel的方法示例
Goroutine和channel是Go在“并發(fā)”方面兩個核心feature,下面這篇文章主要給大家介紹了關(guān)于Golang如何優(yōu)雅關(guān)閉channel的相關(guān)資料,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考解決,下面來一起看看吧。2017-11-11golang 跳出多重循環(huán)的高級break用法說明
這篇文章主要介紹了golang 跳出多重循環(huán)的高級break用法說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12