Go語言中gPRC的使用
核心概念理解
1. Protobuf 與服務定義
gRPC 使用 Protobuf 來定義服務接口和消息格式。在你的例子中,hello.proto
文件定義了一個名為 HelloService
的服務,它包含一個方法 Hello
,該方法接收一個 String
消息并返回一個 String
消息。
service HelloService { rpc Hello (String) returns (String); }
這類似于定義一個函數(shù)接口,但它是跨網(wǎng)絡調用的。
2. 代碼生成
通過 Protobuf 編譯器和 gRPC 插件,你可以生成客戶端和服務器的代碼:
protoc --go_out=plugins=grpc:. hello.proto
這行命令會生成:
- 客戶端代碼:包含如何調用遠程服務的方法
- 服務器代碼:包含如何實現(xiàn)服務的接口定義
3. 服務端實現(xiàn)
服務端需要實現(xiàn) HelloServiceServer
接口:
type HelloServiceImpl struct{} func (p *HelloServiceImpl) Hello(ctx context.Context, args *String) (*String, error) { reply := &String{Value: "hello:" + args.GetValue()} return reply, nil }
這里的 Hello
方法是服務的具體實現(xiàn),它接收一個字符串參數(shù),添加 “hello:” 前綴后返回。
4. 服務啟動
服務端需要啟動 gRPC 服務器并注冊服務:
func main() { grpcServer := grpc.NewServer() RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl)) lis, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal(err) } grpcServer.Serve(lis) }
這部分代碼:
- 創(chuàng)建 gRPC 服務器
- 注冊服務實現(xiàn)
- 監(jiān)聽 TCP 端口
- 啟動服務
5. 客戶端調用
客戶端需要連接到服務端并調用服務:
func main() { conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure()) if err != nil { log.Fatal(err) } defer conn.Close() client := NewHelloServiceClient(conn) reply, err := client.Hello(context.Background(), &String{Value: "hello"}) if err != nil { log.Fatal(err) } fmt.Println(reply.GetValue()) }
這部分代碼:
- 連接到服務端
- 創(chuàng)建客戶端
- 調用遠程方法
- 處理返回結果
與標準庫 RPC 的區(qū)別
- 協(xié)議:gRPC 使用 HTTP/2,而標準庫 RPC 使用自定義協(xié)議
- 序列化:gRPC 使用 Protobuf (二進制),標準庫 RPC 使用 JSON 或 Gob
- 性能:gRPC 通常更快,尤其是在高并發(fā)場景
- 跨語言支持:gRPC 支持多種語言,標準庫 RPC 主要支持 Go
- 流模式:gRPC 支持雙向流,標準庫 RPC 不支持
客戶端異步調用
你提到 gRPC 不直接支持異步調用,但可以通過 Goroutine 實現(xiàn):
// 客戶端異步調用示例 go func() { reply, err := client.Hello(context.Background(), &String{Value: "async"}) if err != nil { log.Println("Error:", err) return } fmt.Println("Async reply:", reply.GetValue()) }()
gPRC流
1. 為什么需要 gRPC 流?
傳統(tǒng) RPC 調用是"請求-響應"模式:
- 客戶端發(fā)送一個請求
- 服務器處理請求并返回一個響應
- 整個過程是同步的,不適合大數(shù)據(jù)傳輸或實時通信
gRPC 流解決了這些問題,它允許:
- 客戶端和服務器之間持續(xù)交換數(shù)據(jù)
- 支持四種流模式:
- 客戶端流(單向)
- 服務器流(單向)
- 雙向流
- 一元(普通 RPC,無流)
2. 定義流服務
在 .proto
文件中,使用 stream
關鍵字定義流:
service HelloService { // 普通 RPC(一元) rpc Hello (String) returns (String); // 雙向流 RPC rpc Channel (stream String) returns (stream String); }
這里的 Channel
方法支持雙向流:
- 客戶端可以持續(xù)發(fā)送
String
消息 - 服務器可以持續(xù)返回
String
消息
3. 流接口詳解
生成的代碼中,流接口包含 Send
和 Recv
方法:
// 服務端流接口 type HelloService_ChannelServer interface { Send(*String) error // 向客戶端發(fā)送消息 Recv() (*String, error) // 從客戶端接收消息 grpc.ServerStream } // 客戶端流接口 type HelloService_ChannelClient interface { Send(*String) error // 向服務器發(fā)送消息 Recv() (*String, error) // 從服務器接收消息 grpc.ClientStream }
這些方法允許雙向、異步的數(shù)據(jù)交換。
4. 服務端實現(xiàn)
服務端實現(xiàn) Channel
方法:
func (p *HelloServiceImpl) Channel(stream HelloService_ChannelServer) error { for { // 接收客戶端消息 args, err := stream.Recv() if err != nil { if err == io.EOF { return nil // 客戶端關閉流 } return err // 處理其他錯誤 } // 處理消息并返回響應 reply := &String{Value: "hello:" + args.GetValue()} err = stream.Send(reply) if err != nil { return err } } }
關鍵點:
stream.Recv()
從客戶端接收消息stream.Send()
向客戶端發(fā)送消息io.EOF
表示客戶端關閉了流
5. 客戶端實現(xiàn)
客戶端需要啟動兩個 Goroutine 分別處理發(fā)送和接收:
// 創(chuàng)建流 stream, err := client.Channel(context.Background()) if err != nil { log.Fatal(err) } // 發(fā)送消息的 Goroutine go func() { for { if err := stream.Send(&String{Value: "hi"}); err != nil { log.Fatal(err) } time.Sleep(time.Second) // 每秒發(fā)送一次 } }() // 接收消息的主循環(huán) for { reply, err := stream.Recv() if err != nil { if err == io.EOF { break // 服務器關閉流 } log.Fatal(err) } fmt.Println("Received:", reply.GetValue()) }
關鍵點:
- 發(fā)送和接收是完全獨立的
- 客戶端和服務器可以隨時發(fā)送數(shù)據(jù)
io.EOF
表示服務器關閉了流
6. 流的工作模式
gRPC 流有四種工作模式:
1. 一元 RPC(無流)
rpc UnaryMethod(Request) returns (Response) {}
- 客戶端發(fā)送一個請求,服務器返回一個響應
2. 服務器流
rpc ServerStreamingMethod(Request) returns (stream Response) {}
- 客戶端發(fā)送一個請求
- 服務器返回一個數(shù)據(jù)流
- 示例:實時數(shù)據(jù)推送
3. 客戶端流
rpc ClientStreamingMethod(stream Request) returns (Response) {}
- 客戶端發(fā)送一個數(shù)據(jù)流
- 服務器返回一個響應
- 示例:大數(shù)據(jù)上傳
4. 雙向流
rpc BidirectionalStreamingMethod(stream Request) returns (stream Response) {}
- 客戶端和服務器可以同時發(fā)送和接收數(shù)據(jù)流
- 示例:實時聊天、多人游戲
7. 流的特性
- 全雙工通信:客戶端和服務器可以同時發(fā)送和接收數(shù)據(jù)
- 異步處理:發(fā)送和接收操作不阻塞其他操作
- 高效傳輸:基于 HTTP/2 的多路復用,單個連接支持多個流
- 大數(shù)據(jù)支持:適合傳輸大文件或持續(xù)數(shù)據(jù)流
- 實時性:數(shù)據(jù)可以立即傳輸,無需等待整個消息完成
8. 適用場景
- 實時數(shù)據(jù)推送(股票行情、實時通知)
- 大數(shù)據(jù)上傳/下載
- 實時聊天系統(tǒng)
- 監(jiān)控系統(tǒng)(持續(xù)數(shù)據(jù)流)
- 游戲服務器(低延遲通信)
9. 注意事項
- 錯誤處理:流可能因網(wǎng)絡問題或服務器關閉而中斷,需要適當處理錯誤
- 資源管理:流使用完后需要關閉,避免資源泄漏
- 并發(fā)控制:多個 Goroutine 訪問同一個流時需要考慮同步問題
- 消息順序:在雙向流中,發(fā)送和接收的順序可能不同步
gRPC發(fā)布-訂閱
發(fā)布-訂閱(PubSub)模式是一種消息傳遞模式,其中發(fā)送者(發(fā)布者)不會直接將消息發(fā)送給特定的接收者(訂閱者),而是將消息分類發(fā)布到主題中。訂閱者可以訂閱一個或多個主題,只接收他們感興趣的消息。這種模式實現(xiàn)了發(fā)布者和訂閱者之間的解耦,非常適合構建分布式系統(tǒng)。
本地發(fā)布-訂閱實現(xiàn)
首先,我們看一下基于 moby/moby/pkg/pubsub
包的本地實現(xiàn):
import ( "github.com/moby/moby/pkg/pubsub" ) func main() { // 創(chuàng)建發(fā)布者,設置超時和隊列大小 p := pubsub.NewPublisher(100*time.Millisecond, 10) // 訂閱golang主題 golang := p.SubscribeTopic(func(v interface{}) bool { if key, ok := v.(string); ok { return strings.HasPrefix(key, "golang:") } return false }) // 訂閱docker主題 docker := p.SubscribeTopic(func(v interface{}) bool { if key, ok := v.(string); ok { return strings.HasPrefix(key, "docker:") } return false }) // 發(fā)布消息 go p.Publish("hi") go p.Publish("golang: https://golang.org") go p.Publish("docker: https://www.docker.com/") // 接收消息 go func() { fmt.Println("golang topic:", <-golang) }() go func() { fmt.Println("docker topic:", <-docker) }() <-make(chan bool) // 保持主程序運行 }
關鍵點:
pubsub.NewPublisher
創(chuàng)建一個發(fā)布者,負責管理主題和分發(fā)消息SubscribeTopic
使用謂詞函數(shù)過濾感興趣的主題- 發(fā)布者發(fā)布消息時,所有訂閱該主題的訂閱者都會收到通知
- 這是一個本地實現(xiàn),無法跨網(wǎng)絡工作
基于gRPC的遠程發(fā)布-訂閱系統(tǒng)
現(xiàn)在,我們將使用gRPC擴展這個系統(tǒng),使其能夠跨網(wǎng)絡工作。
1. 定義服務接口
service PubsubService { rpc Publish (String) returns (String); // 發(fā)布消息 rpc Subscribe (String) returns (stream String); // 訂閱主題 }
這里:
Publish
是一個普通的RPC方法,用于發(fā)布消息Subscribe
是一個服務端流方法,允許客戶端持續(xù)接收消息
2. 服務端實現(xiàn)
type PubsubService struct { pub *pubsub.Publisher } func NewPubsubService() *PubsubService { return &PubsubService{ pub: pubsub.NewPublisher(100*time.Millisecond, 10), } } // 發(fā)布消息 func (p *PubsubService) Publish(ctx context.Context, arg *String) (*String, error) { p.pub.Publish(arg.GetValue()) return &String{}, nil } // 訂閱主題 func (p *PubsubService) Subscribe(arg *String, stream PubsubService_SubscribeServer) error { // 訂閱特定主題 ch := p.pub.SubscribeTopic(func(v interface{}) bool { if key, ok := v.(string); ok { return strings.HasPrefix(key, arg.GetValue()) } return false }) // 將接收到的消息通過流發(fā)送給客戶端 for v := range ch { if err := stream.Send(&String{Value: v.(string)}); err != nil { return err } } return nil }
關鍵點:
- 服務端維護一個
pubsub.Publisher
實例 Publish
方法將消息發(fā)布到本地發(fā)布者Subscribe
方法創(chuàng)建一個主題訂閱,并通過gRPC流將消息發(fā)送給客戶端
3. 客戶端發(fā)布消息
func main() { conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure()) if err != nil { log.Fatal(err) } defer conn.Close() client := NewPubsubServiceClient(conn) // 發(fā)布golang主題消息 _, err = client.Publish(context.Background(), &String{Value: "golang: hello Go"}) if err != nil { log.Fatal(err) } // 發(fā)布docker主題消息 _, err = client.Publish(context.Background(), &String{Value: "docker: hello Docker"}) if err != nil { log.Fatal(err) } }
4. 客戶端訂閱消息
func main() { conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure()) if err != nil { log.Fatal(err) } defer conn.Close() client := NewPubsubServiceClient(conn) // 訂閱golang主題 stream, err := client.Subscribe(context.Background(), &String{Value: "golang:"}) if err != nil { log.Fatal(err) } // 持續(xù)接收消息 for { reply, err := stream.Recv() if err != nil { if err == io.EOF { break } log.Fatal(err) } fmt.Println(reply.GetValue()) } }
工作原理詳解
服務端:
- 創(chuàng)建一個本地的
pubsub.Publisher
實例 Publish
方法將接收到的消息發(fā)布到本地發(fā)布者Subscribe
方法創(chuàng)建一個主題訂閱,并通過gRPC流將消息實時發(fā)送給客戶端
客戶端發(fā)布:
- 調用遠程的
Publish
方法 - 消息通過gRPC傳輸?shù)椒斩?/li>
- 服務端將消息發(fā)布到本地主題
客戶端訂閱:
- 調用遠程的
Subscribe
方法,建立一個服務端流 - 服務端為該客戶端創(chuàng)建一個主題訂閱
- 當有匹配的消息發(fā)布時,服務端通過流將消息發(fā)送給客戶端
- 客戶端持續(xù)接收并處理這些消息
優(yōu)勢與應用場景
這種基于gRPC的發(fā)布-訂閱系統(tǒng)具有以下優(yōu)勢:
- 跨語言支持:gRPC支持多種語言,可以構建多語言混合的分布式系統(tǒng)
- 高性能:基于HTTP/2和Protobuf,性能優(yōu)越
- 實時通信:使用流特性實現(xiàn)實時消息推送
- 解耦:發(fā)布者和訂閱者不需要直接交互
- 擴展性:可以輕松擴展為支持多個主題和大量訂閱者
典型應用場景包括:
- 實時數(shù)據(jù)推送(如股票行情、新聞更新)
- 微服務間的事件驅動通信
- 實時聊天系統(tǒng)
- 監(jiān)控和警報系統(tǒng)
通過這種方式,你可以構建一個跨網(wǎng)絡的、高效的發(fā)布-訂閱系統(tǒng),充分利用gRPC的流特性和類型安全優(yōu)勢。
到此這篇關于Go語言中gPRC的使用的文章就介紹到這了,更多相關Go語言 gPRC內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
windows下使用vscode搭建golang環(huán)境并調試的過程
這篇文章主要介紹了在windows下使用vscode搭建golang環(huán)境并進行調試,主要包括安裝方法及環(huán)境變量配置技巧,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-09-09GOPROXY:解決go get golang.org/x包失敗問題
這篇文章主要介紹了GOPROXY:解決go get golang.org/x包失敗問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01golang使用mTLS實現(xiàn)雙向加密認證http通信
這篇文章主要為大家介紹了golang如何調用mTLS實現(xiàn)雙向加密認證http通信,文中的示例代碼講解詳細,具有一定的學習價值,需要的小伙伴可以參考下2023-08-08