Golang微服務(wù)框架Kratos實(shí)現(xiàn)Kafka消息隊(duì)列的方法
Golang微服務(wù)框架Kratos應(yīng)用Kafka消息隊(duì)列
消息隊(duì)列是一種異步的服務(wù)間通信方式,適用于無服務(wù)器和微服務(wù)架構(gòu)。消息在被處理和刪除之前一直存儲在隊(duì)列上。每條消息僅可被一位用戶處理一次。消息隊(duì)列可被用于分離重量級處理、緩沖或批處理工作以及緩解高峰期工作負(fù)載。
消息隊(duì)列是大型分布式系統(tǒng)不可缺少的中間件,也是高并發(fā)系統(tǒng)的基石中間件,所以掌握好消息隊(duì)列MQ就變得極其重要。
在本文當(dāng)中,您將了解到:什么是消息隊(duì)列?什么是Kafka?怎樣在微服務(wù)框架Kratos當(dāng)中應(yīng)用Kafka進(jìn)行業(yè)務(wù)開發(fā)。
什么是消息隊(duì)列
消息隊(duì)列(Message Queue,簡稱MQ)指保存消息的一個(gè)容器,其實(shí)本質(zhì)就是一個(gè)保存數(shù)據(jù)的隊(duì)列。
消息中間件是指利用高效可靠的消息傳遞機(jī)制進(jìn)行與平臺無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進(jìn)行分布式系統(tǒng)的構(gòu)建。
消息中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用解耦,異步消息,流量削峰等問題,實(shí)現(xiàn)高性能,高可用,可伸縮和最終一致性的系統(tǒng)架構(gòu)。目前使用較多的消息隊(duì)列有:ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、Kafka、NAQ、NATS、Pulsar等。
消息隊(duì)列應(yīng)用場景
消息中間件在互聯(lián)網(wǎng)公司使用得越來越多,主要用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴(kuò)展性、高可用性等方面表現(xiàn)不俗。以下介紹消息隊(duì)列在實(shí)際應(yīng)用中常用的使用場景:異步處理,應(yīng)用解耦,流量削峰和消息通訊。
異步處理
通常的微服務(wù)實(shí)現(xiàn)的接口,都是通過RPC進(jìn)行微服務(wù)、服務(wù)客戶端之間的相互調(diào)用,這是同步阻塞執(zhí)行。有一些業(yè)務(wù),業(yè)務(wù)流程比較耗時(shí)且可以不需要立即返回結(jié)果,還有一些業(yè)務(wù)可以互不干擾的并行執(zhí)行,那么我們就可以將之轉(zhuǎn)為異步,并發(fā)執(zhí)行。從而減少同步接口的請求響應(yīng)時(shí)間,從而提高系統(tǒng)的吞吐量。
以下單為例,用戶下單后需要實(shí)施:生成訂單、贈送活動積分、贈送紅包、發(fā)送下單成功通知等,一系列業(yè)務(wù)處理。假設(shè)三個(gè)業(yè)務(wù)節(jié)點(diǎn)每個(gè)使用100毫秒鐘,不考慮網(wǎng)絡(luò)等其他開銷,則串行方式的時(shí)間是400毫秒,并行的時(shí)間只需要200毫秒。這樣就大大提高了系統(tǒng)的吞吐量。
應(yīng)用解耦
應(yīng)用解耦,顧名思義就是解除應(yīng)用系統(tǒng)之間的耦合依賴。通過消息隊(duì)列,使得每個(gè)應(yīng)用系統(tǒng)不必受其他系統(tǒng)影響,可以更獨(dú)立自主。
以電商系統(tǒng)為例,用戶下單后,訂單系統(tǒng)需要通知積分系統(tǒng)。一般的做法是:訂單系統(tǒng)直接調(diào)用積分系統(tǒng)的接口。這就使得應(yīng)用系統(tǒng)間的耦合特別緊密。如果積分系統(tǒng)無法訪問,則積分處理失敗,從而導(dǎo)致訂單失敗。
加入消息隊(duì)列之后,用戶下單后,訂單系統(tǒng)完成下單業(yè)務(wù)后,將消息寫入消息隊(duì)列,返回用戶訂單下單成功。積分系統(tǒng)通過訂閱下單消息的方式獲取下單通知消息,從而進(jìn)行積分操作。實(shí)現(xiàn)訂單系統(tǒng)與庫存系統(tǒng)的應(yīng)用解耦。如果,在下單時(shí)積分系統(tǒng)系統(tǒng)異常,也不影響用戶正常下單,因?yàn)橄聠魏?,訂單系統(tǒng)寫入消息隊(duì)列就不再關(guān)心其他的后續(xù)操作。
流量削峰
流量削峰也是消息隊(duì)列中的常用場景,一般在秒殺或團(tuán)搶活動中使用廣泛。
以秒殺活動為例,一般會因?yàn)榱髁窟^大,導(dǎo)致流量暴增,應(yīng)用掛掉。為解決這個(gè)問題,一般需要在應(yīng)用前端加入消息隊(duì)列,秒殺業(yè)務(wù)處理系統(tǒng)根據(jù)消息隊(duì)列中的請求信息,再做后續(xù)處理。
如上圖所示,服務(wù)器接收到用戶的請求后,首先寫入消息隊(duì)列,秒殺業(yè)務(wù)處理系統(tǒng)根據(jù)消息隊(duì)列中的請求信息,做后續(xù)業(yè)務(wù)處理。假如消息隊(duì)列長度超過最大數(shù)量,則直接拋棄用戶請求或跳轉(zhuǎn)到錯誤頁面。
消息通訊
消息通訊是指應(yīng)用間的數(shù)據(jù)通信。消息隊(duì)列一般都內(nèi)置了高效的通信機(jī)制,因此也可以用在單純的消息通訊上。比如:實(shí)現(xiàn)點(diǎn)對點(diǎn)消息隊(duì)列,或者聊天室等點(diǎn)對點(diǎn)通訊。
以上實(shí)際是消息隊(duì)列的兩種消息模式,點(diǎn)對點(diǎn)或發(fā)布訂閱模式。
什么是 Apache Kafka?
Apache Kafka 是一個(gè)分布式數(shù)據(jù)流處理平臺,可以實(shí)時(shí)發(fā)布、訂閱、存儲和處理數(shù)據(jù)流。它旨在處理多種來源的數(shù)據(jù)流,并將它們交付給多個(gè)消費(fèi)者。簡而言之,它可以移動大量數(shù)據(jù),不僅是從 A 點(diǎn)移到 B 點(diǎn),而是能從 A 到 Z 的多個(gè)點(diǎn)移到任何您想要的位置,并且可以同時(shí)進(jìn)行。
Apache Kafka 可以取代傳統(tǒng)的企業(yè)級消息傳遞系統(tǒng)。它最初是 Linkedin 為處理每天 1.4 萬億條消息而開發(fā)的一個(gè)內(nèi)部系統(tǒng),現(xiàn)已成為應(yīng)用于各式各樣企業(yè)需求的開源數(shù)據(jù)流處理解決方案。
Kafka 的工作原理
Kafka 結(jié)合了兩種消息收發(fā)模型、列隊(duì)和發(fā)布-訂閱,以向客戶提供其各自的主要優(yōu)勢。通過列隊(duì)可以跨多個(gè)使用器實(shí)例分發(fā)數(shù)據(jù)處理,因此具有很高的可擴(kuò)展性。但是,傳統(tǒng)隊(duì)列不支持多訂閱者。發(fā)布-訂閱方法支持多訂閱者,但是由于每條消息傳送給每個(gè)訂閱者,因此無法用于跨多個(gè)工作進(jìn)程發(fā)布工作。Kafka uses 使用分區(qū)日志模型將這兩種解決方案融合在一起。日志是一種有序的記錄,這些日志分成區(qū)段或分區(qū),分別對應(yīng)不同的訂閱者。這意味著,同一個(gè)主題可有多個(gè)訂閱者,分別有各自的分區(qū)以獲得更高的可擴(kuò)展性。最后,Kafka 的模型帶來可重放性,允許多個(gè)相互獨(dú)立的應(yīng)用程序從數(shù)據(jù)流執(zhí)行讀取以便按自己的速率獨(dú)立地工作。
列隊(duì)
發(fā)布-訂閱
Kafka的基本概念
kafka運(yùn)行在集群上,集群包含一個(gè)或多個(gè)服務(wù)器。kafka把消息存在topic中,每一條消息包含鍵值(key),值(value)和時(shí)間戳(timestamp)。
kafka有以下一些基本概念:
Producer - 消息生產(chǎn)者,就是向kafka broker發(fā)消息的客戶端。
Consumer - 消息消費(fèi)者,是消息的使用方,負(fù)責(zé)消費(fèi)Kafka服務(wù)器上的消息。
Topic - 主題,由用戶定義并配置在Kafka服務(wù)器,用于建立Producer和Consumer之間的訂閱關(guān)系。生產(chǎn)者發(fā)送消息到指定的Topic下,消息者從這個(gè)Topic下消費(fèi)消息。
Partition - 消息分區(qū),一個(gè)topic可以分為多個(gè) partition,每個(gè)partition是一個(gè)有序的隊(duì)列。partition中的每條消息都會被分配一個(gè)有序的id(offset)。
Broker - 一臺kafka服務(wù)器就是一個(gè)broker。一個(gè)集群由多個(gè)broker組成。一個(gè)broker可以容納多個(gè)topic。
Consumer Group - 消費(fèi)者分組,用于歸組同類消費(fèi)者。每個(gè)consumer屬于一個(gè)特定的consumer group,多個(gè)消費(fèi)者可以共同消息一個(gè)Topic下的消息,每個(gè)消費(fèi)者消費(fèi)其中的部分消息,這些消費(fèi)者就組成了一個(gè)分組,擁有同一個(gè)分組名稱,通常也被稱為消費(fèi)者集群。
Offset - 消息在partition中的偏移量。每一條消息在partition都有唯一的偏移量,消息者可以指定偏移量來指定要消費(fèi)的消息。
Docker部署開發(fā)環(huán)境
docker pull bitnami/zookeeper:latest docker pull bitnami/kafka:latest docker run -itd \ --name zookeeper-test \ -p 2181:2181 \ -e ALLOW_ANONYMOUS_LOGIN=yes \ bitnami/zookeeper:latest docker run -itd \ --name kafka-standalone \ --link zookeeper-test \ -p 9092:9092 \ -v /home/data/kafka:/bitnami/kafka \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_LISTENERS=PLAINTEXT://:9092 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper-test:2181 \ -e ALLOW_PLAINTEXT_LISTENER=yes \ --user root \ bitnami/kafka:latest
管理工具
Kratos下如何應(yīng)用Kafka?
我對Kafka做了一個(gè)封裝,要在Kratos下面使用Kafka,首先需要在項(xiàng)目中引用我封裝的兩個(gè)庫:
第一個(gè)庫可以視之為Kafka客戶端的一個(gè)封裝:
go get -u github.com/tx7do/kratos-transport/broker/kafka
這一個(gè)庫是講Kafka的客戶端封裝成一個(gè)Kratos的transport.Server
,該庫依賴上面的庫:
go get -u github.com/tx7do/kratos-transport/transport/kafka
想要在Kratos里面應(yīng)用Kafka,有兩條途徑可以達(dá)成:
- 在
Data
層引用Kafka的Broker
,僅用于發(fā)布(Publish)消息之用,換言之,就是只發(fā)送不接收的單向通訊; - 在
Server
層引用Kafka的Server
,可以發(fā)布(Publish)消息,也可以訂閱(Subscribe)消息,換言之,就是既發(fā)送又接收的全雙工通訊。
接下來我就詳細(xì)的講解應(yīng)用方法:
在Data層引用Kafka的Broker
首先創(chuàng)建Kafka的Broker
:
import ( "github.com/tx7do/kratos-transport/broker" "github.com/tx7do/kratos-transport/broker/kafka" ) func NewKafkaBroker(cfg *conf.Bootstrap) broker.Broker { b := kafka.NewBroker( broker.WithAddress(cfg.Data.Kafka.Addrs...), broker.WithCodec(cfg.Data.Kafka.Codec), ) if b == nil { return nil } _ = b.Init() if err := b.Connect(); err != nil { return nil } return b }
然后,注入到Wire
的ProviderSet
:
package data import "github.com/google/wire" // ProviderSet is data providers. var ProviderSet = wire.NewSet( ... NewKafkaBroker, )
最后,我們就可以在Service
里面調(diào)用了:
package service type ReportService struct { v1.ReportServiceHTTPServer kafkaBroker broker.Broker log *log.Helper } func NewReportService(logger log.Logger, kafkaBroker broker.Broker) *ReportService { l := log.NewHelper(log.With(logger, "module", "report/service/agent-service")) return &ReportService{ log: l, kafkaBroker: kafkaBroker, } } func (s *ReportService) PostReport(_ context.Context, req *v1.PostReportRequest) (*v1.PostReportResponse, error) { _ = s.kafkaBroker.Publish(topic.EventReportData, reportV1.RealTimeWarehousingData{ EventName: &req.EventName, ReportData: &req.Content, CreateTime: util.UnixMilliToStringPtr(trans.Int64(time.Now().UnixMilli())), }) return &v1.PostReportResponse{ Code: 0, Msg: "success", }, nil }
需要注意的是,添加了以上代碼之后,需要使用命令生成Wire的膠水代碼:
go run -mod=mod github.com/google/wire/cmd/wire ./cmd/server
在Server層引用Kafka的Server
首先要創(chuàng)建Server
:
package server import ( ... "github.com/tx7do/kratos-transport/transport/kafka" ) // NewKafkaServer create a kafka server. func NewKafkaServer(cfg *conf.Bootstrap, _ log.Logger, svc *service.SaverService) *kafka.Server { ctx := context.Background() srv := kafka.NewServer( kafka.WithAddress(cfg.Server.Kafka.Addrs), kafka.WithGlobalTracerProvider(), kafka.WithGlobalPropagator(), kafka.WithCodec("json"), ) registerKafkaSubscribers(ctx, srv, svc) return srv } func registerKafkaSubscribers(ctx context.Context, srv *kafka.Server, svc *service.SaverService) { _ = kafka.RegisterSubscriber(srv, ctx, topic.UserReportData, topic.LoggerSaverQueue, false, svc.SaveUserReport, ) _ = kafka.RegisterSubscriber(srv, ctx, topic.EventReportData, topic.LoggerSaverQueue, false, svc.SaveEventReport, ) }
接著,調(diào)用kratos.Server
把Kafka的服務(wù)器注冊到Kratos里去:
func newApp(ll log.Logger, rr registry.Registrar, ks *kafka.Server) *kratos.App { return kratos.New( kratos.ID(Service.GetInstanceId()), kratos.Name(Service.Name), kratos.Version(Service.Version), kratos.Metadata(Service.Metadata), kratos.Logger(ll), kratos.Server( ks, ), kratos.Registrar(rr), ) }
最后,我們就可以在Service
里愉快的玩耍了,在這里,我只演示收到Kafka消息之后立即寫入數(shù)據(jù)庫的操作:
package service type SaverService struct { log *log.Helper statusRepo *data.AcceptStatusRepo realtimeRepo *data.RealtimeWarehousingRepo } func NewSaverService( logger log.Logger, statusRepo *data.AcceptStatusRepo, realtimeRepo *data.RealtimeWarehousingRepo, ) *SaverService { l := log.NewHelper(log.With(logger, "module", "saver/service/logger-service")) return &SaverService{ log: l, statusRepo: statusRepo, realtimeRepo: realtimeRepo, } } func (s *SaverService) SaveUserReport(_ context.Context, _ string, _ broker.Headers, msg *v1.AcceptStatusReportData) error { return s.statusRepo.Create(msg) } func (s *SaverService) SaveEventReport(_ context.Context, _ string, _ broker.Headers, msg *v1.RealTimeWarehousingData) error { return s.realtimeRepo.Create(msg) }
實(shí)例代碼
以上代碼以及接口定義,可以在我的另外一個(gè)開源項(xiàng)目里面找到:
https://github.com/tx7do/kratos-uba
https://gitee.com/tx7do/kratos-uba
以上就是Golang微服務(wù)框架Kratos實(shí)現(xiàn)Kafka消息隊(duì)列的方法的詳細(xì)內(nèi)容,更多關(guān)于Golang Kafka消息隊(duì)列的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
使用Golang的channel交叉打印兩個(gè)數(shù)組的操作
這篇文章主要介紹了使用Golang的channel交叉打印兩個(gè)數(shù)組的操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-04-04Go語言中的goroutine和channel如何協(xié)同工作
在Go語言中,goroutine和channel是并發(fā)編程的兩個(gè)核心概念,它們協(xié)同工作以實(shí)現(xiàn)高效、安全的并發(fā)執(zhí)行,本文將詳細(xì)探討goroutine和channel如何協(xié)同工作,以及它們在并發(fā)編程中的作用和優(yōu)勢,需要的朋友可以參考下2024-04-04Golang中crypto/cipher加密標(biāo)準(zhǔn)庫全面指南
本文主要介紹了Golang中crypto/cipher加密標(biāo)準(zhǔn)庫,包括對稱加密、非對稱加密以及使用流加密和塊加密算法,文中通過示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-02-02