Golang Socket Server自定義協(xié)議的簡單實(shí)現(xiàn)方案
在Server和Client通訊中,由于網(wǎng)絡(luò)等原因很可能會發(fā)生數(shù)據(jù)丟包的現(xiàn)象。如果數(shù)據(jù)缺失,服務(wù)端接收的信息不完整,就會造成混亂。
我們需要在Server和Client之間建立一個(gè)通訊協(xié)議,通過協(xié)議中的規(guī)則,判斷當(dāng)前接收到的信息是否完整。根據(jù)信息的完整情況,采取不同的處理方式。
通訊協(xié)議protocol的核心就是設(shè)計(jì)一個(gè)頭部。如果傳來的信息不包含這個(gè)頭部,就說明當(dāng)前信息和之前的信息是同一條。那么就把當(dāng)前信息和之前的那條信息合并成一條。
而協(xié)議主要包含的功能是封裝(Enpack)和解析(Depack)。Enpack是客戶端對信息進(jìn)行數(shù)據(jù)封裝。封裝之后可以傳遞給服務(wù)器。Depack是服務(wù)器對信息進(jìn)行數(shù)據(jù)解析。
其中有個(gè)Const部分,用于定義頭部、頭部長度、客戶端傳入信息長度。
在代碼中,我們這樣定義:
const ( ConstHeader = "Headers" ConstHeaderLength = 7 ConstMLength = 4 )
頭部的內(nèi)容為"Headers",長度為7。所以ConstHeaderLenth=7.
而信息傳遞過程中,我們會把int類型轉(zhuǎn)換成byte類型。一個(gè)int的長度等于4個(gè)byte的長度。因此,我們設(shè)置ConstMLength=4.代表客戶端的傳來的信息大小。
自定義協(xié)議protocal的代碼示例如下:
/** * protocol * @Author: Jian Junbo * @Email: junbojian@qq.com * @Create: 2017/9/14 11:49 * * Description: 通訊協(xié)議處理 */ package protocol import ( "bytes" "encoding/binary" ) const ( ConstHeader = "Headers" ConstHeaderLength = 7 ConstMLength = 4 ) //封包 func Enpack(message []byte) []byte { return append(append([]byte(ConstHeader), IntToBytes(len(message))...), message...) } //解包 func Depack(buffer []byte) []byte { length := len(buffer) var i int data := make([]byte, 32) for i = 0; i < length; i++ { if length < i + ConstHeaderLength + ConstMLength{ break } if string(buffer[i:i+ConstHeaderLength]) == ConstHeader { messageLength := ByteToInt(buffer[i+ConstHeaderLength : i+ConstHeaderLength+ConstMLength]) if length < i+ConstHeaderLength+ConstMLength+messageLength { break } data = buffer[i+ConstHeaderLength+ConstMLength : i+ConstHeaderLength+ConstMLength+messageLength] } } if i == length { return make([]byte, 0) } return data } //字節(jié)轉(zhuǎn)換成整形 func ByteToInt(n []byte) int { bytesbuffer := bytes.NewBuffer(n) var x int32 binary.Read(bytesbuffer, binary.BigEndian, &x) return int(x) } //整數(shù)轉(zhuǎn)換成字節(jié) func IntToBytes(n int) []byte { x := int32(n) bytesBuffer := bytes.NewBuffer([]byte{}) binary.Write(bytesBuffer, binary.BigEndian, x) return bytesBuffer.Bytes() }
Server端主要通過協(xié)議來解析客戶端發(fā)送來的信息。建立一個(gè)函數(shù),用來完成連接對接收信息的處理。其中建立了通道readerChannel,并把接收來的信息放在通道里。
在放入通道之前,使用protocol和Depack對信息進(jìn)行解析。
//連接處理 func handleConnection(conn net.Conn) { //緩沖區(qū),存儲被截?cái)嗟臄?shù)據(jù) tmpBuffer := make([]byte, 0) //接收解包 readerChannel := make(chan []byte, 10000) go reader(readerChannel) buffer := make([]byte, 1024) for{ n, err := conn.Read(buffer) if err != nil{ Log(conn.RemoteAddr().String(), "connection error: ", err) return } tmpBuffer = protocol.Depack(append(tmpBuffer, buffer[:n]...)) readerChannel <- tmpBuffer //接收的信息寫入通道 } defer conn.Close() }
如果信息讀取發(fā)生錯(cuò)誤(包括讀取到信息結(jié)束符EOF),都會打印錯(cuò)誤信息,并跳出循環(huán)。
Log(conn.RemoteAddr().String(), "connection error: ", err)
return
由于通道內(nèi)的數(shù)據(jù)是[]byte型的。需要轉(zhuǎn)換成string。這個(gè)工作有專門的獲取通道數(shù)據(jù)的reader(readerChannel chan []byte)來完成。
//獲取通道數(shù)據(jù) func reader(readerchannel chan []byte) { for{ select { case data := <-readerchannel: Log(string(data)) //打印通道內(nèi)的信息 } } }
查看Server端代碼示例:
/** * MySocketProtocalServer * @Author: Jian Junbo * @Email: junbojian@qq.com * @Create: 2017/9/14 13:54 * Copyright (c) 2017 Jian Junbo All rights reserved. * * Description: 服務(wù)端,接收客戶端傳來的信息 */ package main import ( "net" "fmt" "os" "log" "protocol" ) func main() { netListen, err := net.Listen("tcp", "localhost:7373") CheckErr(err) defer netListen.Close() Log("Waiting for client ...") //啟動后,等待客戶端訪問。 for{ conn, err := netListen.Accept() //監(jiān)聽客戶端 if err != nil { Log(conn.RemoteAddr().String(), "發(fā)了了錯(cuò)誤:", err) continue } Log(conn.RemoteAddr().String(), "tcp connection success") go handleConnection(conn) } } //連接處理 func handleConnection(conn net.Conn) { //緩沖區(qū),存儲被截?cái)嗟臄?shù)據(jù) tmpBuffer := make([]byte, 0) //接收解包 readerChannel := make(chan []byte, 10000) go reader(readerChannel) buffer := make([]byte, 1024) for{ n, err := conn.Read(buffer) if err != nil{ Log(conn.RemoteAddr().String(), "connection error: ", err) return } tmpBuffer = protocol.Depack(append(tmpBuffer, buffer[:n]...)) readerChannel <- tmpBuffer //接收的信息寫入通道 } defer conn.Close() } //獲取通道數(shù)據(jù) func reader(readerchannel chan []byte) { for{ select { case data := <-readerchannel: Log(string(data)) //打印通道內(nèi)的信息 } } } //日志處理 func Log(v ...interface{}) { log.Println(v...) } //錯(cuò)誤處理 func CheckErr(err error) { if err != nil { fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error()) os.Exit(1) } }
客戶端使用Enpack封裝要發(fā)送到服務(wù)端的信息后,寫入連接conn中。
/** * MySocketProtocalClient * @Author: Jian Junbo * @Email: junbojian@qq.com * @Create: 2017/9/14 15:23 * Copyright (c) 2017 Jian Junbo All rights reserved. * * Description: */ package main import ( "net" "time" "strconv" "protocol" "fmt" "os" ) //發(fā)送100次請求 func send(conn net.Conn) { for i := 0; i < 100; i++ { session := GetSession() words := "{\"ID\":\""+strconv.Itoa(i)+"\",\"Session\":\""+session+"20170914165908\",\"Meta\":\"golang\",\"Content\":\"message\"}" conn.Write(protocol.Enpack([]byte(words))) fmt.Println(words) //打印發(fā)送出去的信息 } fmt.Println("send over") defer conn.Close() } //用當(dāng)前時(shí)間做識別。當(dāng)前時(shí)間的十進(jìn)制整數(shù) func GetSession() string { gs1 := time.Now().Unix() gs2 := strconv.FormatInt(gs1, 10) return gs2 } func main() { server := "localhost:7373" tcpAddr, err := net.ResolveTCPAddr("tcp4", server) if err != nil{ fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error()) os.Exit(1) } conn, err := net.DialTCP("tcp", nil, tcpAddr) if err != nil{ fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error()) os.Exit(1) } fmt.Println("connect success") send(conn) }
補(bǔ)充:golang從0到1利用socket編程實(shí)現(xiàn)一個(gè)簡單的http服務(wù)器
開始編程
第一份代碼
package main import ( "fmt" "net" ) func accept_request_thread(conn net.Conn) { defer conn.Close() for { // 創(chuàng)建一個(gè)新切片, 用作保存數(shù)據(jù)的緩沖區(qū) buf := make([]byte, 1024) n, err := conn.Read(buf) // 從conn中讀取客戶端發(fā)送的數(shù)據(jù)內(nèi)容 if err != nil { fmt.Printf("客戶端退出 err=%v\n", err) return } fmt.Printf(" 接受消息 %s\n", string(buf[:n])) } } func main() { listen, err := net.Listen("tcp", ":8888") // 創(chuàng)建用于監(jiān)聽的 socket if err != nil { fmt.Println("listen err=", err) return } fmt.Println("監(jiān)聽套接字,創(chuàng)建成功, 服務(wù)器開始監(jiān)聽。。。") defer listen.Close() // 服務(wù)器結(jié)束前關(guān)閉 listener // 循環(huán)等待客戶端來鏈接 for { fmt.Println("阻塞等待客戶端來鏈接...") conn, err := listen.Accept() // 創(chuàng)建用戶數(shù)據(jù)通信的socket if err != nil { fmt.Println("Accept() err=", err) } else { fmt.Println("通信套接字,創(chuàng)建成功。。。") } // 這里準(zhǔn)備起一個(gè)協(xié)程,為客戶端服務(wù) go accept_request_thread(conn) } }
瀏覽器發(fā)送一個(gè)get請求:
http://192.168.0.20:8888/api/camera/get_ptz?camera_id=1324566666789876543
服務(wù)端接受到的消息如下:
http://192.168.0.20:8888/api/camera/get_ptz?camera_id=1324566666789876543
我們接下來的任務(wù)就是 解析這些字符串,從中獲取 當(dāng)前是什么方法,什么請求,參數(shù)是什么?
先定義一個(gè)小目標(biāo),獲取當(dāng)前是什么方法。
處理一個(gè)簡單的get請求
package main import ( "encoding/json" "fmt" "log" "net" "strings" ) func unimplemented(conn net.Conn){ var buf string buf = "HTTP/1.0 501 Method Not Implemented\r\n" _, _ = conn.Write([]byte(buf)) buf = "Server: httpd/0.1.0\r\n" _, _ = conn.Write([]byte(buf)) buf = "Content-Type: text/html\r\n" _, _ = conn.Write([]byte(buf)) buf = "\r\n" _, _ = conn.Write([]byte(buf)) buf = "<HTML><HEAD><TITLE>Method Not Implemented\r\n" _, _ = conn.Write([]byte(buf)) buf = "</TITLE></HEAD>\r\n" _, _ = conn.Write([]byte(buf)) buf = "<BODY><P>HTTP request method not supported.\r\n" _, _ = conn.Write([]byte(buf)) buf = "</BODY></HTML>\r\n" _, _ = conn.Write([]byte(buf)) } func accept_request_thread(conn net.Conn) { defer conn.Close() var i int buf := make([]byte, 1024) n, err := conn.Read(buf) // 從conn中讀取客戶端發(fā)送的數(shù)據(jù)內(nèi)容 if err != nil { fmt.Printf("客戶端退出 err=%v\n", err) return } // 獲取方法 i = 0 var method_bt strings.Builder for(i < n && buf[i] != ' '){ method_bt.WriteByte(buf[i]) i++; } method := method_bt.String() if(method != "GET"){ unimplemented(conn) return } for(i < n && buf[i] == ' '){ i++ } //api/camera/get_ptz?camera_id=1324566666789876543 var url_bt strings.Builder for(i < n && buf[i] != ' '){ url_bt.WriteByte(buf[i]) i++; } url := url_bt.String() if(method == "GET"){ //url ---> /api/camera/get_ptz?camera_id=1324566666789876543 // 跳到第一個(gè)? var path, query_string string j := strings.IndexAny(url, "?") if(j != -1){ path = url[:j] if(j + 1 < len(url)){ query_string = url[j+1:] } }else{ path = url } fmt.Print(path + "請求已經(jīng)創(chuàng)建\t") resp := execute(path, query_string)// =1324566666789876543 fmt.Println("返回", string(resp)) header(conn, "application/json", len(resp)); _ , err := conn.Write(resp) if(err != nil){ fmt.Println(err) } } } //回應(yīng)客戶端必須先設(shè)置好head頭,瀏覽器才能解析 func header(conn net.Conn, content_type string , length int ) { var buf string buf = "HTTP/1.0 200 OK\r\n" _, _ = conn.Write([]byte(buf)) buf = "Server: httpd/0.1.0\r\n" _, _ = conn.Write([]byte(buf)) buf = "Content-Type: " + content_type + "\r\n" _, _ = conn.Write([]byte(buf)) _, _ = fmt.Sscanf(buf, "Content-Length: %d\r\n", length) buf = "Content-Type: " + content_type + "\r\n" _, _ = conn.Write([]byte(buf)) buf = "\r\n" _, _ = conn.Write([]byte(buf)) } func execute(path string, query_string string) ([]byte) { query_params := make(map[string]string) parse_query_params(query_string, query_params) if("/api/camera/get_ptz" == path){ /* * do something */ camera_id := query_params["camera_id"] resp := make(map[string]interface{}) resp["camera_id"] = camera_id resp["code"] = 200 resp["msg"] = "ok" rs, err := json.Marshal(resp) if err != nil{ log.Fatalln(err) } return rs }else if("get_abc" == path){ /* * do something */ return []byte("abcdcvfdswa") } return []byte("do't match") } /*map作為函數(shù)入?yún)⑹亲鳛橹羔樳M(jìn)行傳遞的 函數(shù)里面對map進(jìn)行修改時(shí),會同時(shí)修改源map的值,但是將map修改為nil時(shí),則達(dá)不到預(yù)期效果。*/ // camera_id=1324566666789876543&tt=%E5%88%9B%E5%BB%BA%E6%88%90%E5%8A%9F func parse_query_params(query_string string, query_params map[string]string) { kvs := strings.Split(query_string, "&") if(len(kvs) == 0){ return } for _, kv := range kvs { kv := strings.Split(kv, "=") if(len(kv) != 2){ continue } query_params[kv[0]] = kv[1] } } func main() { listen, err := net.Listen("tcp", ":8888") // 創(chuàng)建用于監(jiān)聽的 socket if err != nil { fmt.Println("listen err=", err) return } fmt.Println("監(jiān)聽套接字,創(chuàng)建成功, 服務(wù)器開始監(jiān)聽。。。") defer listen.Close() // 服務(wù)器結(jié)束前關(guān)閉 listener // 循環(huán)等待客戶端鏈接 for { fmt.Println("阻塞等待客戶端鏈接...") conn, err := listen.Accept() // 創(chuàng)建用戶數(shù)據(jù)通信的socket if err != nil { panic("Accept() err= " + err.Error()) } // 這里準(zhǔn)備起一個(gè)協(xié)程,為客戶端服務(wù) go accept_request_thread(conn) } }
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教。
相關(guān)文章
golang進(jìn)程在docker中OOM后hang住問題解析
這篇文章主要介紹了golang進(jìn)程在docker中OOM后hang住問題解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10GoFrame框架gset使用對比PHP?Java?Redis優(yōu)勢
這篇文章主要為大家介紹了GoFrame框架gset對比PHP?Java?Redis的使用優(yōu)勢詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06