Go語言中g(shù)PRC的使用
核心概念理解
1. Protobuf 與服務(wù)定義
gRPC 使用 Protobuf 來定義服務(wù)接口和消息格式。在你的例子中,hello.proto 文件定義了一個名為 HelloService 的服務(wù),它包含一個方法 Hello,該方法接收一個 String 消息并返回一個 String 消息。
service HelloService {
rpc Hello (String) returns (String);
}
這類似于定義一個函數(shù)接口,但它是跨網(wǎng)絡(luò)調(diào)用的。
2. 代碼生成
通過 Protobuf 編譯器和 gRPC 插件,你可以生成客戶端和服務(wù)器的代碼:
protoc --go_out=plugins=grpc:. hello.proto
這行命令會生成:
- 客戶端代碼:包含如何調(diào)用遠程服務(wù)的方法
- 服務(wù)器代碼:包含如何實現(xiàn)服務(wù)的接口定義
3. 服務(wù)端實現(xiàn)
服務(wù)端需要實現(xiàn) HelloServiceServer 接口:
type HelloServiceImpl struct{}
func (p *HelloServiceImpl) Hello(ctx context.Context, args *String) (*String, error) {
reply := &String{Value: "hello:" + args.GetValue()}
return reply, nil
}
這里的 Hello 方法是服務(wù)的具體實現(xiàn),它接收一個字符串參數(shù),添加 “hello:” 前綴后返回。
4. 服務(wù)啟動
服務(wù)端需要啟動 gRPC 服務(wù)器并注冊服務(wù):
func main() {
grpcServer := grpc.NewServer()
RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))
lis, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal(err)
}
grpcServer.Serve(lis)
}
這部分代碼:
- 創(chuàng)建 gRPC 服務(wù)器
- 注冊服務(wù)實現(xiàn)
- 監(jiān)聽 TCP 端口
- 啟動服務(wù)
5. 客戶端調(diào)用
客戶端需要連接到服務(wù)端并調(diào)用服務(wù):
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := NewHelloServiceClient(conn)
reply, err := client.Hello(context.Background(), &String{Value: "hello"})
if err != nil {
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
這部分代碼:
- 連接到服務(wù)端
- 創(chuàng)建客戶端
- 調(diào)用遠程方法
- 處理返回結(jié)果
與標準庫 RPC 的區(qū)別
- 協(xié)議:gRPC 使用 HTTP/2,而標準庫 RPC 使用自定義協(xié)議
- 序列化:gRPC 使用 Protobuf (二進制),標準庫 RPC 使用 JSON 或 Gob
- 性能:gRPC 通常更快,尤其是在高并發(fā)場景
- 跨語言支持:gRPC 支持多種語言,標準庫 RPC 主要支持 Go
- 流模式:gRPC 支持雙向流,標準庫 RPC 不支持
客戶端異步調(diào)用
你提到 gRPC 不直接支持異步調(diào)用,但可以通過 Goroutine 實現(xiàn):
// 客戶端異步調(diào)用示例
go func() {
reply, err := client.Hello(context.Background(), &String{Value: "async"})
if err != nil {
log.Println("Error:", err)
return
}
fmt.Println("Async reply:", reply.GetValue())
}()
gPRC流
1. 為什么需要 gRPC 流?
傳統(tǒng) RPC 調(diào)用是"請求-響應(yīng)"模式:
- 客戶端發(fā)送一個請求
- 服務(wù)器處理請求并返回一個響應(yīng)
- 整個過程是同步的,不適合大數(shù)據(jù)傳輸或?qū)崟r通信
gRPC 流解決了這些問題,它允許:
- 客戶端和服務(wù)器之間持續(xù)交換數(shù)據(jù)
- 支持四種流模式:
- 客戶端流(單向)
- 服務(wù)器流(單向)
- 雙向流
- 一元(普通 RPC,無流)
2. 定義流服務(wù)
在 .proto 文件中,使用 stream 關(guān)鍵字定義流:
service HelloService {
// 普通 RPC(一元)
rpc Hello (String) returns (String);
// 雙向流 RPC
rpc Channel (stream String) returns (stream String);
}
這里的 Channel 方法支持雙向流:
- 客戶端可以持續(xù)發(fā)送
String消息 - 服務(wù)器可以持續(xù)返回
String消息
3. 流接口詳解
生成的代碼中,流接口包含 Send 和 Recv 方法:
// 服務(wù)端流接口
type HelloService_ChannelServer interface {
Send(*String) error // 向客戶端發(fā)送消息
Recv() (*String, error) // 從客戶端接收消息
grpc.ServerStream
}
// 客戶端流接口
type HelloService_ChannelClient interface {
Send(*String) error // 向服務(wù)器發(fā)送消息
Recv() (*String, error) // 從服務(wù)器接收消息
grpc.ClientStream
}
這些方法允許雙向、異步的數(shù)據(jù)交換。
4. 服務(wù)端實現(xiàn)
服務(wù)端實現(xiàn) Channel 方法:
func (p *HelloServiceImpl) Channel(stream HelloService_ChannelServer) error {
for {
// 接收客戶端消息
args, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil // 客戶端關(guān)閉流
}
return err // 處理其他錯誤
}
// 處理消息并返回響應(yīng)
reply := &String{Value: "hello:" + args.GetValue()}
err = stream.Send(reply)
if err != nil {
return err
}
}
}
關(guān)鍵點:
stream.Recv()從客戶端接收消息stream.Send()向客戶端發(fā)送消息io.EOF表示客戶端關(guān)閉了流
5. 客戶端實現(xiàn)
客戶端需要啟動兩個 Goroutine 分別處理發(fā)送和接收:
// 創(chuàng)建流
stream, err := client.Channel(context.Background())
if err != nil {
log.Fatal(err)
}
// 發(fā)送消息的 Goroutine
go func() {
for {
if err := stream.Send(&String{Value: "hi"}); err != nil {
log.Fatal(err)
}
time.Sleep(time.Second) // 每秒發(fā)送一次
}
}()
// 接收消息的主循環(huán)
for {
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break // 服務(wù)器關(guān)閉流
}
log.Fatal(err)
}
fmt.Println("Received:", reply.GetValue())
}
關(guān)鍵點:
- 發(fā)送和接收是完全獨立的
- 客戶端和服務(wù)器可以隨時發(fā)送數(shù)據(jù)
io.EOF表示服務(wù)器關(guān)閉了流
6. 流的工作模式
gRPC 流有四種工作模式:
1. 一元 RPC(無流)
rpc UnaryMethod(Request) returns (Response) {}
- 客戶端發(fā)送一個請求,服務(wù)器返回一個響應(yīng)
2. 服務(wù)器流
rpc ServerStreamingMethod(Request) returns (stream Response) {}
- 客戶端發(fā)送一個請求
- 服務(wù)器返回一個數(shù)據(jù)流
- 示例:實時數(shù)據(jù)推送
3. 客戶端流
rpc ClientStreamingMethod(stream Request) returns (Response) {}
- 客戶端發(fā)送一個數(shù)據(jù)流
- 服務(wù)器返回一個響應(yīng)
- 示例:大數(shù)據(jù)上傳
4. 雙向流
rpc BidirectionalStreamingMethod(stream Request) returns (stream Response) {}
- 客戶端和服務(wù)器可以同時發(fā)送和接收數(shù)據(jù)流
- 示例:實時聊天、多人游戲
7. 流的特性
- 全雙工通信:客戶端和服務(wù)器可以同時發(fā)送和接收數(shù)據(jù)
- 異步處理:發(fā)送和接收操作不阻塞其他操作
- 高效傳輸:基于 HTTP/2 的多路復用,單個連接支持多個流
- 大數(shù)據(jù)支持:適合傳輸大文件或持續(xù)數(shù)據(jù)流
- 實時性:數(shù)據(jù)可以立即傳輸,無需等待整個消息完成
8. 適用場景
- 實時數(shù)據(jù)推送(股票行情、實時通知)
- 大數(shù)據(jù)上傳/下載
- 實時聊天系統(tǒng)
- 監(jiān)控系統(tǒng)(持續(xù)數(shù)據(jù)流)
- 游戲服務(wù)器(低延遲通信)
9. 注意事項
- 錯誤處理:流可能因網(wǎng)絡(luò)問題或服務(wù)器關(guān)閉而中斷,需要適當處理錯誤
- 資源管理:流使用完后需要關(guān)閉,避免資源泄漏
- 并發(fā)控制:多個 Goroutine 訪問同一個流時需要考慮同步問題
- 消息順序:在雙向流中,發(fā)送和接收的順序可能不同步
gRPC發(fā)布-訂閱
發(fā)布-訂閱(PubSub)模式是一種消息傳遞模式,其中發(fā)送者(發(fā)布者)不會直接將消息發(fā)送給特定的接收者(訂閱者),而是將消息分類發(fā)布到主題中。訂閱者可以訂閱一個或多個主題,只接收他們感興趣的消息。這種模式實現(xiàn)了發(fā)布者和訂閱者之間的解耦,非常適合構(gòu)建分布式系統(tǒng)。
本地發(fā)布-訂閱實現(xiàn)
首先,我們看一下基于 moby/moby/pkg/pubsub 包的本地實現(xiàn):
import (
"github.com/moby/moby/pkg/pubsub"
)
func main() {
// 創(chuàng)建發(fā)布者,設(shè)置超時和隊列大小
p := pubsub.NewPublisher(100*time.Millisecond, 10)
// 訂閱golang主題
golang := p.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
return strings.HasPrefix(key, "golang:")
}
return false
})
// 訂閱docker主題
docker := p.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
return strings.HasPrefix(key, "docker:")
}
return false
})
// 發(fā)布消息
go p.Publish("hi")
go p.Publish("golang: https://golang.org")
go p.Publish("docker: https://www.docker.com/")
// 接收消息
go func() {
fmt.Println("golang topic:", <-golang)
}()
go func() {
fmt.Println("docker topic:", <-docker)
}()
<-make(chan bool) // 保持主程序運行
}
關(guān)鍵點:
pubsub.NewPublisher創(chuàng)建一個發(fā)布者,負責管理主題和分發(fā)消息SubscribeTopic使用謂詞函數(shù)過濾感興趣的主題- 發(fā)布者發(fā)布消息時,所有訂閱該主題的訂閱者都會收到通知
- 這是一個本地實現(xiàn),無法跨網(wǎng)絡(luò)工作
基于gRPC的遠程發(fā)布-訂閱系統(tǒng)
現(xiàn)在,我們將使用gRPC擴展這個系統(tǒng),使其能夠跨網(wǎng)絡(luò)工作。
1. 定義服務(wù)接口
service PubsubService {
rpc Publish (String) returns (String); // 發(fā)布消息
rpc Subscribe (String) returns (stream String); // 訂閱主題
}
這里:
Publish是一個普通的RPC方法,用于發(fā)布消息Subscribe是一個服務(wù)端流方法,允許客戶端持續(xù)接收消息
2. 服務(wù)端實現(xiàn)
type PubsubService struct {
pub *pubsub.Publisher
}
func NewPubsubService() *PubsubService {
return &PubsubService{
pub: pubsub.NewPublisher(100*time.Millisecond, 10),
}
}
// 發(fā)布消息
func (p *PubsubService) Publish(ctx context.Context, arg *String) (*String, error) {
p.pub.Publish(arg.GetValue())
return &String{}, nil
}
// 訂閱主題
func (p *PubsubService) Subscribe(arg *String, stream PubsubService_SubscribeServer) error {
// 訂閱特定主題
ch := p.pub.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
return strings.HasPrefix(key, arg.GetValue())
}
return false
})
// 將接收到的消息通過流發(fā)送給客戶端
for v := range ch {
if err := stream.Send(&String{Value: v.(string)}); err != nil {
return err
}
}
return nil
}
關(guān)鍵點:
- 服務(wù)端維護一個
pubsub.Publisher實例 Publish方法將消息發(fā)布到本地發(fā)布者Subscribe方法創(chuàng)建一個主題訂閱,并通過gRPC流將消息發(fā)送給客戶端
3. 客戶端發(fā)布消息
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := NewPubsubServiceClient(conn)
// 發(fā)布golang主題消息
_, err = client.Publish(context.Background(), &String{Value: "golang: hello Go"})
if err != nil {
log.Fatal(err)
}
// 發(fā)布docker主題消息
_, err = client.Publish(context.Background(), &String{Value: "docker: hello Docker"})
if err != nil {
log.Fatal(err)
}
}
4. 客戶端訂閱消息
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := NewPubsubServiceClient(conn)
// 訂閱golang主題
stream, err := client.Subscribe(context.Background(), &String{Value: "golang:"})
if err != nil {
log.Fatal(err)
}
// 持續(xù)接收消息
for {
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
}
工作原理詳解
服務(wù)端:
- 創(chuàng)建一個本地的
pubsub.Publisher實例 Publish方法將接收到的消息發(fā)布到本地發(fā)布者Subscribe方法創(chuàng)建一個主題訂閱,并通過gRPC流將消息實時發(fā)送給客戶端
客戶端發(fā)布:
- 調(diào)用遠程的
Publish方法 - 消息通過gRPC傳輸?shù)椒?wù)端
- 服務(wù)端將消息發(fā)布到本地主題
客戶端訂閱:
- 調(diào)用遠程的
Subscribe方法,建立一個服務(wù)端流 - 服務(wù)端為該客戶端創(chuàng)建一個主題訂閱
- 當有匹配的消息發(fā)布時,服務(wù)端通過流將消息發(fā)送給客戶端
- 客戶端持續(xù)接收并處理這些消息
優(yōu)勢與應(yīng)用場景
這種基于gRPC的發(fā)布-訂閱系統(tǒng)具有以下優(yōu)勢:
- 跨語言支持:gRPC支持多種語言,可以構(gòu)建多語言混合的分布式系統(tǒng)
- 高性能:基于HTTP/2和Protobuf,性能優(yōu)越
- 實時通信:使用流特性實現(xiàn)實時消息推送
- 解耦:發(fā)布者和訂閱者不需要直接交互
- 擴展性:可以輕松擴展為支持多個主題和大量訂閱者
典型應(yīng)用場景包括:
- 實時數(shù)據(jù)推送(如股票行情、新聞更新)
- 微服務(wù)間的事件驅(qū)動通信
- 實時聊天系統(tǒng)
- 監(jiān)控和警報系統(tǒng)
通過這種方式,你可以構(gòu)建一個跨網(wǎng)絡(luò)的、高效的發(fā)布-訂閱系統(tǒng),充分利用gRPC的流特性和類型安全優(yōu)勢。
到此這篇關(guān)于Go語言中g(shù)PRC的使用的文章就介紹到這了,更多相關(guān)Go語言 gPRC內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
windows下使用vscode搭建golang環(huán)境并調(diào)試的過程
這篇文章主要介紹了在windows下使用vscode搭建golang環(huán)境并進行調(diào)試,主要包括安裝方法及環(huán)境變量配置技巧,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-09-09
GOPROXY:解決go get golang.org/x包失敗問題
這篇文章主要介紹了GOPROXY:解決go get golang.org/x包失敗問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01
golang使用mTLS實現(xiàn)雙向加密認證http通信
這篇文章主要為大家介紹了golang如何調(diào)用mTLS實現(xiàn)雙向加密認證http通信,文中的示例代碼講解詳細,具有一定的學習價值,需要的小伙伴可以參考下2023-08-08

