go實現(xiàn)grpc四種數(shù)據(jù)流模式
1. 什么是數(shù)據(jù)流
grpc中的stream,srteam顧名思義就是一種流,可以源源不斷的推送數(shù)據(jù),很適合傳輸一些大數(shù)據(jù),或者服務端和客戶端長時間數(shù)據(jù)交互,比如客戶端可以向服務端訂閱一個數(shù)據(jù),服務端就可以利用stream,源源不斷地推送數(shù)據(jù)。
底層還原成socket編程
2. grpc的四種數(shù)據(jù)流
1.簡單模式
2.服務端數(shù)據(jù)流模式(Server-side streaming RPC)
3.客戶端數(shù)據(jù)流模式(Client-side streaming RPC)
4.雙向數(shù)據(jù)流模式(Bidirectional streaming RPC)
2.1 簡單模式
這種模式最為傳統(tǒng),即客戶端發(fā)起一次請求,服務端響應一個數(shù)據(jù),這和大家平時熟悉的RPC沒有什么大的區(qū)別,上兩篇中介紹此模式。
2.2 服務端數(shù)據(jù)流模式
這種模式是客戶端發(fā)起一次請求,服務端返回一段連續(xù)的數(shù)據(jù)流。典型的例子是客戶端向服務端發(fā)送一個股票代碼,服務端就把該股票的實時數(shù)據(jù)源源不斷的返回給客戶端
2.3 客戶端數(shù)據(jù)流模式
與服務端數(shù)據(jù)流模式相反,這次是客戶端源源不斷的向服務端發(fā)送數(shù)據(jù)流,而在發(fā)送結束后,由服務端返回一個響應。典型的例子是物聯(lián)網(wǎng)終端向服務器報送數(shù)據(jù)。
2.4 雙向數(shù)據(jù)流
顧名思義,這是客戶端和服務端都可以向對方發(fā)送數(shù)據(jù)流,這個時候雙方的數(shù)據(jù)可以同時互相發(fā)送,也就是可以實現(xiàn)實時交互。典型的例子是聊天機器人。
3. 上代碼
3.1 代碼目錄
3.2 編寫stream.proto文件
stream是常量,寫在哪一邊,哪一邊就是數(shù)據(jù)流
syntax = "proto3"; option go_package = "./;proto"; service Greeter { // 定義方法,stream是常量,流模式 rpc ServerStream (StreamRequestData) returns (stream StreamResponseData); //服務端流模式,拉消息 rpc ClientStream (stream StreamRequestData) returns (StreamResponseData); //客戶端流模式,推消息 rpc AllStream (stream StreamRequestData) returns (stream StreamResponseData); //雙向流模式,能推能拉 } message StreamRequestData { string data = 1; //編號 } message StreamResponseData { string data = 1; //編號 }
生成go的protobuf文件命令:
cd到proto目錄下
命令:protoc -I . hello.proto --go_out=plugins=grpc:.
3.3 編寫server文件
package main import ( "file_test/grpc_go_stream/proto" "fmt" "net" "sync" "time" "google.golang.org/grpc" ) const port = 8082 type server struct{} func (s *server) ServerStream(req *proto.StreamRequestData, res proto.Greeter_ServerStreamServer) error { i := 0 for { i++ //業(yè)務代碼 _ = res.Send(&proto.StreamResponseData{ Data: fmt.Sprintf("這是發(fā)給%s的數(shù)據(jù)流", req.Data), }) time.Sleep(time.Second * 1) if i > 10 { break } } return nil } func (s *server) ClientStream(cliStr proto.Greeter_ClientStreamServer) error { for { //業(yè)務代碼 res, err := cliStr.Recv() if err != nil { fmt.Println("本次客戶端流數(shù)據(jù)發(fā)送完了:",err) break } fmt.Println("客戶端發(fā)來消息:",res.Data) } return nil } func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error { wg:=sync.WaitGroup{} wg.Add(2) //接受客戶端消息的協(xié)程 go func() { defer wg.Done() for { //業(yè)務代碼 res, err := allStr.Recv() if err != nil { fmt.Println("本次客戶端流數(shù)據(jù)發(fā)送完了:",err) break } fmt.Println("收到客戶端發(fā)來消息:",res.Data) } }() //發(fā)送消息給客戶端的協(xié)程 go func() { defer wg.Done() i := 0 for { i++ //業(yè)務代碼 _ = allStr.Send(&proto.StreamResponseData{ Data: fmt.Sprintf("這是發(fā)給客戶端的數(shù)據(jù)流"), }) time.Sleep(time.Second * 1) if i > 10 { break } } }() wg.Wait() return nil } // 啟動 func start() { // 1.實例化server g := grpc.NewServer() // 2.注冊邏輯到server中 proto.RegisterGreeterServer(g, &server{}) // 3.啟動server lis, err := net.Listen("tcp", "127.0.0.1:8082") if err != nil { panic("監(jiān)聽錯誤:" + err.Error()) } err = g.Serve(lis) if err != nil { panic("啟動錯誤:" + err.Error()) } } func main() { start() }
3.4 編寫client文件
package main import ( "context" "file_test/grpc_go_stream/proto" "fmt" "sync" "time" "google.golang.org/grpc" ) var rpc proto.GreeterClient func serverStreamDemo() { //服務端流模式 res,err:=rpc.ServerStream(context.Background(),&proto.StreamRequestData{Data: "jeff"}) if err != nil { panic("rpc請求錯誤:"+err.Error()) } for { data,err:=res.Recv() // if err != nil { fmt.Println("客戶端發(fā)送完了:",err) return } fmt.Println("客戶端返回數(shù)據(jù)流值:",data.Data) } } func clientStreamDemo() { //客戶端流模式 cliStr, err := rpc.ClientStream(context.Background()) if err != nil { panic("rpc請求錯誤:" + err.Error()) } i := 0 for { i++ _ = cliStr.Send(&proto.StreamRequestData{ Data: "jeff", }) time.Sleep(time.Second * 1) if i > 10 { break } } } func clientAndServerStreamDemo() { //雙向流模式 allStr, _ := rpc.AllStream(context.Background()) wg := sync.WaitGroup{} wg.Add(1) //接受服務端消息的協(xié)程 go func() { defer wg.Done() for { //業(yè)務代碼 res, err := allStr.Recv() if err != nil { fmt.Println("本次服務端流數(shù)據(jù)發(fā)送完了:", err) break } fmt.Println("收到服務端發(fā)來消息:", res.Data) } }() //發(fā)送消息給服務端的協(xié)程 go func() { defer wg.Done() i := 0 for { i++ //業(yè)務代碼 _ = allStr.Send(&proto.StreamRequestData{ Data: fmt.Sprintf("這是發(fā)給服務端的數(shù)據(jù)流"), }) time.Sleep(time.Second * 1) if i > 10 { break } } }() wg.Wait() } // 啟動 func start() { conn, err := grpc.Dial("127.0.0.1:8082", grpc.WithInsecure()) if err != nil { panic("rpc連接錯誤:" + err.Error()) } defer conn.Close() rpc = proto.NewGreeterClient(conn) //初始化 serverStreamDemo() //服務端流模式 clientStreamDemo() //客戶端流模式 clientAndServerStreamDemo() // 雙向流模式 } func main() { start() }
以上就是go實現(xiàn)grpc四種數(shù)據(jù)流模式的詳細內容,更多關于go實現(xiàn)grpc流模式的資料請關注腳本之家其它相關文章!
相關文章
Go?WEB框架使用攔截器驗證用戶登錄狀態(tài)實現(xiàn)
這篇文章主要為大家介紹了Go?WEB框架使用攔截器驗證用戶登錄狀態(tài)實現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-07-07基于Go+OpenCV實現(xiàn)人臉識別功能的詳細示例
OpenCV是一個強大的計算機視覺庫,提供了豐富的圖像處理和計算機視覺算法,本文將向你介紹在Mac上安裝OpenCV的步驟,并演示如何使用Go的OpenCV綁定庫進行人臉識別,需要的朋友可以參考下2023-07-07Go語言使用protojson庫實現(xiàn)Protocol Buffers與JSON轉換
本文主要介紹Google開源的工具庫Protojson庫如何Protocol Buffers與JSON進行轉換,以及和標準庫encoding/json的性能對比,需要的朋友可以參考下2023-09-09