Golang微服務(wù)框架Kratos實(shí)現(xiàn)Kafka消息隊(duì)列的方法
Golang微服務(wù)框架Kratos應(yīng)用Kafka消息隊(duì)列
消息隊(duì)列是一種異步的服務(wù)間通信方式,適用于無(wú)服務(wù)器和微服務(wù)架構(gòu)。消息在被處理和刪除之前一直存儲(chǔ)在隊(duì)列上。每條消息僅可被一位用戶(hù)處理一次。消息隊(duì)列可被用于分離重量級(jí)處理、緩沖或批處理工作以及緩解高峰期工作負(fù)載。
消息隊(duì)列是大型分布式系統(tǒng)不可缺少的中間件,也是高并發(fā)系統(tǒng)的基石中間件,所以掌握好消息隊(duì)列MQ就變得極其重要。
在本文當(dāng)中,您將了解到:什么是消息隊(duì)列?什么是Kafka?怎樣在微服務(wù)框架Kratos當(dāng)中應(yīng)用Kafka進(jìn)行業(yè)務(wù)開(kāi)發(fā)。
什么是消息隊(duì)列
消息隊(duì)列(Message Queue,簡(jiǎn)稱(chēng)MQ)指保存消息的一個(gè)容器,其實(shí)本質(zhì)就是一個(gè)保存數(shù)據(jù)的隊(duì)列。
消息中間件是指利用高效可靠的消息傳遞機(jī)制進(jìn)行與平臺(tái)無(wú)關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來(lái)進(jìn)行分布式系統(tǒng)的構(gòu)建。
消息中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用解耦,異步消息,流量削峰等問(wèn)題,實(shí)現(xiàn)高性能,高可用,可伸縮和最終一致性的系統(tǒng)架構(gòu)。目前使用較多的消息隊(duì)列有:ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、Kafka、NAQ、NATS、Pulsar等。
消息隊(duì)列應(yīng)用場(chǎng)景
消息中間件在互聯(lián)網(wǎng)公司使用得越來(lái)越多,主要用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,在易用性、擴(kuò)展性、高可用性等方面表現(xiàn)不俗。以下介紹消息隊(duì)列在實(shí)際應(yīng)用中常用的使用場(chǎng)景:異步處理,應(yīng)用解耦,流量削峰和消息通訊。
異步處理
通常的微服務(wù)實(shí)現(xiàn)的接口,都是通過(guò)RPC進(jìn)行微服務(wù)、服務(wù)客戶(hù)端之間的相互調(diào)用,這是同步阻塞執(zhí)行。有一些業(yè)務(wù),業(yè)務(wù)流程比較耗時(shí)且可以不需要立即返回結(jié)果,還有一些業(yè)務(wù)可以互不干擾的并行執(zhí)行,那么我們就可以將之轉(zhuǎn)為異步,并發(fā)執(zhí)行。從而減少同步接口的請(qǐng)求響應(yīng)時(shí)間,從而提高系統(tǒng)的吞吐量。

以下單為例,用戶(hù)下單后需要實(shí)施:生成訂單、贈(zèng)送活動(dòng)積分、贈(zèng)送紅包、發(fā)送下單成功通知等,一系列業(yè)務(wù)處理。假設(shè)三個(gè)業(yè)務(wù)節(jié)點(diǎn)每個(gè)使用100毫秒鐘,不考慮網(wǎng)絡(luò)等其他開(kāi)銷(xiāo),則串行方式的時(shí)間是400毫秒,并行的時(shí)間只需要200毫秒。這樣就大大提高了系統(tǒng)的吞吐量。
應(yīng)用解耦
應(yīng)用解耦,顧名思義就是解除應(yīng)用系統(tǒng)之間的耦合依賴(lài)。通過(guò)消息隊(duì)列,使得每個(gè)應(yīng)用系統(tǒng)不必受其他系統(tǒng)影響,可以更獨(dú)立自主。
以電商系統(tǒng)為例,用戶(hù)下單后,訂單系統(tǒng)需要通知積分系統(tǒng)。一般的做法是:訂單系統(tǒng)直接調(diào)用積分系統(tǒng)的接口。這就使得應(yīng)用系統(tǒng)間的耦合特別緊密。如果積分系統(tǒng)無(wú)法訪(fǎng)問(wèn),則積分處理失敗,從而導(dǎo)致訂單失敗。

加入消息隊(duì)列之后,用戶(hù)下單后,訂單系統(tǒng)完成下單業(yè)務(wù)后,將消息寫(xiě)入消息隊(duì)列,返回用戶(hù)訂單下單成功。積分系統(tǒng)通過(guò)訂閱下單消息的方式獲取下單通知消息,從而進(jìn)行積分操作。實(shí)現(xiàn)訂單系統(tǒng)與庫(kù)存系統(tǒng)的應(yīng)用解耦。如果,在下單時(shí)積分系統(tǒng)系統(tǒng)異常,也不影響用戶(hù)正常下單,因?yàn)橄聠魏?,訂單系統(tǒng)寫(xiě)入消息隊(duì)列就不再關(guān)心其他的后續(xù)操作。
流量削峰
流量削峰也是消息隊(duì)列中的常用場(chǎng)景,一般在秒殺或團(tuán)搶活動(dòng)中使用廣泛。
以秒殺活動(dòng)為例,一般會(huì)因?yàn)榱髁窟^(guò)大,導(dǎo)致流量暴增,應(yīng)用掛掉。為解決這個(gè)問(wèn)題,一般需要在應(yīng)用前端加入消息隊(duì)列,秒殺業(yè)務(wù)處理系統(tǒng)根據(jù)消息隊(duì)列中的請(qǐng)求信息,再做后續(xù)處理。

如上圖所示,服務(wù)器接收到用戶(hù)的請(qǐng)求后,首先寫(xiě)入消息隊(duì)列,秒殺業(yè)務(wù)處理系統(tǒng)根據(jù)消息隊(duì)列中的請(qǐng)求信息,做后續(xù)業(yè)務(wù)處理。假如消息隊(duì)列長(zhǎng)度超過(guò)最大數(shù)量,則直接拋棄用戶(hù)請(qǐng)求或跳轉(zhuǎn)到錯(cuò)誤頁(yè)面。
消息通訊
消息通訊是指應(yīng)用間的數(shù)據(jù)通信。消息隊(duì)列一般都內(nèi)置了高效的通信機(jī)制,因此也可以用在單純的消息通訊上。比如:實(shí)現(xiàn)點(diǎn)對(duì)點(diǎn)消息隊(duì)列,或者聊天室等點(diǎn)對(duì)點(diǎn)通訊。

以上實(shí)際是消息隊(duì)列的兩種消息模式,點(diǎn)對(duì)點(diǎn)或發(fā)布訂閱模式。
什么是 Apache Kafka?
Apache Kafka 是一個(gè)分布式數(shù)據(jù)流處理平臺(tái),可以實(shí)時(shí)發(fā)布、訂閱、存儲(chǔ)和處理數(shù)據(jù)流。它旨在處理多種來(lái)源的數(shù)據(jù)流,并將它們交付給多個(gè)消費(fèi)者。簡(jiǎn)而言之,它可以移動(dòng)大量數(shù)據(jù),不僅是從 A 點(diǎn)移到 B 點(diǎn),而是能從 A 到 Z 的多個(gè)點(diǎn)移到任何您想要的位置,并且可以同時(shí)進(jìn)行。
Apache Kafka 可以取代傳統(tǒng)的企業(yè)級(jí)消息傳遞系統(tǒng)。它最初是 Linkedin 為處理每天 1.4 萬(wàn)億條消息而開(kāi)發(fā)的一個(gè)內(nèi)部系統(tǒng),現(xiàn)已成為應(yīng)用于各式各樣企業(yè)需求的開(kāi)源數(shù)據(jù)流處理解決方案。
Kafka 的工作原理
Kafka 結(jié)合了兩種消息收發(fā)模型、列隊(duì)和發(fā)布-訂閱,以向客戶(hù)提供其各自的主要優(yōu)勢(shì)。通過(guò)列隊(duì)可以跨多個(gè)使用器實(shí)例分發(fā)數(shù)據(jù)處理,因此具有很高的可擴(kuò)展性。但是,傳統(tǒng)隊(duì)列不支持多訂閱者。發(fā)布-訂閱方法支持多訂閱者,但是由于每條消息傳送給每個(gè)訂閱者,因此無(wú)法用于跨多個(gè)工作進(jìn)程發(fā)布工作。Kafka uses 使用分區(qū)日志模型將這兩種解決方案融合在一起。日志是一種有序的記錄,這些日志分成區(qū)段或分區(qū),分別對(duì)應(yīng)不同的訂閱者。這意味著,同一個(gè)主題可有多個(gè)訂閱者,分別有各自的分區(qū)以獲得更高的可擴(kuò)展性。最后,Kafka 的模型帶來(lái)可重放性,允許多個(gè)相互獨(dú)立的應(yīng)用程序從數(shù)據(jù)流執(zhí)行讀取以便按自己的速率獨(dú)立地工作。
列隊(duì)

發(fā)布-訂閱

Kafka的基本概念
kafka運(yùn)行在集群上,集群包含一個(gè)或多個(gè)服務(wù)器。kafka把消息存在topic中,每一條消息包含鍵值(key),值(value)和時(shí)間戳(timestamp)。
kafka有以下一些基本概念:
Producer - 消息生產(chǎn)者,就是向kafka broker發(fā)消息的客戶(hù)端。
Consumer - 消息消費(fèi)者,是消息的使用方,負(fù)責(zé)消費(fèi)Kafka服務(wù)器上的消息。
Topic - 主題,由用戶(hù)定義并配置在Kafka服務(wù)器,用于建立Producer和Consumer之間的訂閱關(guān)系。生產(chǎn)者發(fā)送消息到指定的Topic下,消息者從這個(gè)Topic下消費(fèi)消息。
Partition - 消息分區(qū),一個(gè)topic可以分為多個(gè) partition,每個(gè)partition是一個(gè)有序的隊(duì)列。partition中的每條消息都會(huì)被分配一個(gè)有序的id(offset)。
Broker - 一臺(tái)kafka服務(wù)器就是一個(gè)broker。一個(gè)集群由多個(gè)broker組成。一個(gè)broker可以容納多個(gè)topic。
Consumer Group - 消費(fèi)者分組,用于歸組同類(lèi)消費(fèi)者。每個(gè)consumer屬于一個(gè)特定的consumer group,多個(gè)消費(fèi)者可以共同消息一個(gè)Topic下的消息,每個(gè)消費(fèi)者消費(fèi)其中的部分消息,這些消費(fèi)者就組成了一個(gè)分組,擁有同一個(gè)分組名稱(chēng),通常也被稱(chēng)為消費(fèi)者集群。
Offset - 消息在partition中的偏移量。每一條消息在partition都有唯一的偏移量,消息者可以指定偏移量來(lái)指定要消費(fèi)的消息。
Docker部署開(kāi)發(fā)環(huán)境
docker pull bitnami/zookeeper:latest
docker pull bitnami/kafka:latest
docker run -itd \
--name zookeeper-test \
-p 2181:2181 \
-e ALLOW_ANONYMOUS_LOGIN=yes \
bitnami/zookeeper:latest
docker run -itd \
--name kafka-standalone \
--link zookeeper-test \
-p 9092:9092 \
-v /home/data/kafka:/bitnami/kafka \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_LISTENERS=PLAINTEXT://:9092 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper-test:2181 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
--user root \
bitnami/kafka:latest管理工具
Kratos下如何應(yīng)用Kafka?
我對(duì)Kafka做了一個(gè)封裝,要在Kratos下面使用Kafka,首先需要在項(xiàng)目中引用我封裝的兩個(gè)庫(kù):
第一個(gè)庫(kù)可以視之為Kafka客戶(hù)端的一個(gè)封裝:
go get -u github.com/tx7do/kratos-transport/broker/kafka
這一個(gè)庫(kù)是講Kafka的客戶(hù)端封裝成一個(gè)Kratos的transport.Server,該庫(kù)依賴(lài)上面的庫(kù):
go get -u github.com/tx7do/kratos-transport/transport/kafka
想要在Kratos里面應(yīng)用Kafka,有兩條途徑可以達(dá)成:
- 在
Data層引用Kafka的Broker,僅用于發(fā)布(Publish)消息之用,換言之,就是只發(fā)送不接收的單向通訊; - 在
Server層引用Kafka的Server,可以發(fā)布(Publish)消息,也可以訂閱(Subscribe)消息,換言之,就是既發(fā)送又接收的全雙工通訊。
接下來(lái)我就詳細(xì)的講解應(yīng)用方法:
在Data層引用Kafka的Broker
首先創(chuàng)建Kafka的Broker:
import (
"github.com/tx7do/kratos-transport/broker"
"github.com/tx7do/kratos-transport/broker/kafka"
)
func NewKafkaBroker(cfg *conf.Bootstrap) broker.Broker {
b := kafka.NewBroker(
broker.WithAddress(cfg.Data.Kafka.Addrs...),
broker.WithCodec(cfg.Data.Kafka.Codec),
)
if b == nil {
return nil
}
_ = b.Init()
if err := b.Connect(); err != nil {
return nil
}
return b
}然后,注入到Wire的ProviderSet:
package data
import "github.com/google/wire"
// ProviderSet is data providers.
var ProviderSet = wire.NewSet(
...
NewKafkaBroker,
)最后,我們就可以在Service里面調(diào)用了:
package service
type ReportService struct {
v1.ReportServiceHTTPServer
kafkaBroker broker.Broker
log *log.Helper
}
func NewReportService(logger log.Logger, kafkaBroker broker.Broker) *ReportService {
l := log.NewHelper(log.With(logger, "module", "report/service/agent-service"))
return &ReportService{
log: l,
kafkaBroker: kafkaBroker,
}
}
func (s *ReportService) PostReport(_ context.Context, req *v1.PostReportRequest) (*v1.PostReportResponse, error) {
_ = s.kafkaBroker.Publish(topic.EventReportData, reportV1.RealTimeWarehousingData{
EventName: &req.EventName,
ReportData: &req.Content,
CreateTime: util.UnixMilliToStringPtr(trans.Int64(time.Now().UnixMilli())),
})
return &v1.PostReportResponse{
Code: 0,
Msg: "success",
}, nil
}需要注意的是,添加了以上代碼之后,需要使用命令生成Wire的膠水代碼:
go run -mod=mod github.com/google/wire/cmd/wire ./cmd/server
在Server層引用Kafka的Server
首先要?jiǎng)?chuàng)建Server:
package server
import (
...
"github.com/tx7do/kratos-transport/transport/kafka"
)
// NewKafkaServer create a kafka server.
func NewKafkaServer(cfg *conf.Bootstrap, _ log.Logger, svc *service.SaverService) *kafka.Server {
ctx := context.Background()
srv := kafka.NewServer(
kafka.WithAddress(cfg.Server.Kafka.Addrs),
kafka.WithGlobalTracerProvider(),
kafka.WithGlobalPropagator(),
kafka.WithCodec("json"),
)
registerKafkaSubscribers(ctx, srv, svc)
return srv
}
func registerKafkaSubscribers(ctx context.Context, srv *kafka.Server, svc *service.SaverService) {
_ = kafka.RegisterSubscriber(srv, ctx,
topic.UserReportData, topic.LoggerSaverQueue, false,
svc.SaveUserReport,
)
_ = kafka.RegisterSubscriber(srv, ctx,
topic.EventReportData, topic.LoggerSaverQueue, false,
svc.SaveEventReport,
)
}接著,調(diào)用kratos.Server把Kafka的服務(wù)器注冊(cè)到Kratos里去:
func newApp(ll log.Logger, rr registry.Registrar, ks *kafka.Server) *kratos.App {
return kratos.New(
kratos.ID(Service.GetInstanceId()),
kratos.Name(Service.Name),
kratos.Version(Service.Version),
kratos.Metadata(Service.Metadata),
kratos.Logger(ll),
kratos.Server(
ks,
),
kratos.Registrar(rr),
)
}最后,我們就可以在Service里愉快的玩耍了,在這里,我只演示收到Kafka消息之后立即寫(xiě)入數(shù)據(jù)庫(kù)的操作:
package service
type SaverService struct {
log *log.Helper
statusRepo *data.AcceptStatusRepo
realtimeRepo *data.RealtimeWarehousingRepo
}
func NewSaverService(
logger log.Logger,
statusRepo *data.AcceptStatusRepo,
realtimeRepo *data.RealtimeWarehousingRepo,
) *SaverService {
l := log.NewHelper(log.With(logger, "module", "saver/service/logger-service"))
return &SaverService{
log: l,
statusRepo: statusRepo,
realtimeRepo: realtimeRepo,
}
}
func (s *SaverService) SaveUserReport(_ context.Context, _ string, _ broker.Headers, msg *v1.AcceptStatusReportData) error {
return s.statusRepo.Create(msg)
}
func (s *SaverService) SaveEventReport(_ context.Context, _ string, _ broker.Headers, msg *v1.RealTimeWarehousingData) error {
return s.realtimeRepo.Create(msg)
}實(shí)例代碼
以上代碼以及接口定義,可以在我的另外一個(gè)開(kāi)源項(xiàng)目里面找到:
https://github.com/tx7do/kratos-uba
https://gitee.com/tx7do/kratos-uba
以上就是Golang微服務(wù)框架Kratos實(shí)現(xiàn)Kafka消息隊(duì)列的方法的詳細(xì)內(nèi)容,更多關(guān)于Golang Kafka消息隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Go語(yǔ)言的隊(duì)列和堆棧實(shí)現(xiàn)方法
- 用golang實(shí)現(xiàn)一個(gè)定時(shí)器任務(wù)隊(duì)列實(shí)例
- Django使用Celery異步任務(wù)隊(duì)列的使用
- golang實(shí)現(xiàn)redis的延時(shí)消息隊(duì)列功能示例
- Golang實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列
- 基于Golang實(shí)現(xiàn)延遲隊(duì)列(DelayQueue)
- Go高級(jí)特性探究之優(yōu)先級(jí)隊(duì)列詳解
- Go語(yǔ)言隊(duì)列的四種實(shí)現(xiàn)及使用場(chǎng)景
相關(guān)文章
go語(yǔ)言通過(guò)管道連接兩個(gè)命令行進(jìn)程的方法
這篇文章主要介紹了go語(yǔ)言通過(guò)管道連接兩個(gè)命令行進(jìn)程的方法,實(shí)例分析了Go語(yǔ)言操作命令行進(jìn)程的技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-03-03
Go 簡(jiǎn)單實(shí)現(xiàn)多租戶(hù)數(shù)據(jù)庫(kù)隔離
本文主要介紹了Go 簡(jiǎn)單實(shí)現(xiàn)多租戶(hù)數(shù)據(jù)庫(kù)隔離,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-05-05

