Go操作各大消息隊列教程(RabbitMQ、Kafka)
1 RabbitMQ
1.1 概念
①基本名詞
當前市面上mq的產(chǎn)品很多,比如RabbitMQ、Kafka、ActiveMQ、ZeroMQ和阿里巴巴捐獻給Apache的RocketMQ。甚至連redis這種NoSQL都支持MQ的功能。
- Broker:表示消息隊列服務實體
- Virtual Host:虛擬主機。標識一批交換機、消息隊列和相關對象。vhost是AMQP概念的基礎,必須在鏈接時指定,RabbitMQ默認的vhost是 /。
- AMQP(Advanced Message Queuing Protocol)高級消息隊列協(xié)議
- Exchange:交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務器中的隊列。
- Queue:消息隊列,用來保存消息直到發(fā)送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
②常見模式
1. simple簡單模式
消息的消費者(consumer) 監(jiān)聽(while) 消息隊列,如果隊列中有消息,就消費掉,消息被拿走后,自動從隊列中刪除(隱患 消息可能沒有被消費者正確處理,已經(jīng)從隊列中消失了,造成消息的丟失)
2. worker工作模式
多個消費者從一個隊列中爭搶消息
- (隱患,高并發(fā)情況下,默認會產(chǎn)生某一個消息被多個消費者共同使用,可以設置一個開關(syncronize,與同步鎖的性能不一樣) 保證一條消息只能被一個消費者使用)
- 應用場景:紅包;大項目中的資源調度(任務分配系統(tǒng)不需知道哪一個任務執(zhí)行系統(tǒng)在空閑,直接將任務扔到消息隊列中,空閑的系統(tǒng)自動爭搶)
3. publish/subscribe發(fā)布訂閱(共享資源)
消費者訂閱消息,然后從訂閱的隊列中獲取消息進行消費。
- X代表交換機rabbitMQ內部組件,erlang 消息產(chǎn)生者是代碼完成,代碼的執(zhí)行效率不高,消息產(chǎn)生者將消息放入交換機,交換機發(fā)布訂閱把消息發(fā)送到所有消息隊列中,對應消息隊列的消費者拿到消息進行消費
- 相關場景:郵件群發(fā),群聊天,廣播(廣告)
4. routing路由模式
- 交換機根據(jù)路由規(guī)則,將消息路由到不同的隊列中
- 消息生產(chǎn)者將消息發(fā)送給交換機按照路由判斷,路由是字符串(info) 當前產(chǎn)生的消息攜帶路由字符(對象的方法),交換機根據(jù)路由的key,只能匹配上路由key對應的消息隊列,對應的消費者才能消費消息;
5. topic主題模式(路由模式的一種)
- 星號井號代表通配符
- 星號代表多個單詞,井號代表一個單詞
- 路由功能添加模糊匹配
- 消息產(chǎn)生者產(chǎn)生消息,把消息交給交換機
- 交換機根據(jù)key的規(guī)則模糊匹配到對應的隊列,由隊列的監(jiān)聽消費者接收消息消費
1.2 搭建(docker方式)
①拉取鏡像
# 拉取鏡像 docker pull rabbitmq:3.7-management
②創(chuàng)建并啟動容器
# 創(chuàng)建并運行容器 docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.7-management #5672是項目中連接rabbitmq的端口(我這里映射的是5672),15672是rabbitmq的web管理界面端口(我映射為15672) # 輸入網(wǎng)址http://ip:15672即可進入rabbitmq的web管理頁面,賬戶密碼:guest / guest
③web界面創(chuàng)建用戶和virtual host
下面為了我們后續(xù)的操作,首先我們新建一個Virtual Host并且給他分配一個用戶名,用來隔離數(shù)據(jù),根據(jù)自己需要自行創(chuàng)建
新增virtual host
新增用戶
點擊新建好的用戶,設置其host
最終效果
1.3 代碼操作
①RabbitMQ struct:包含創(chuàng)建、消費、生產(chǎn)消息
package RabbitMQ import ( "fmt" "github.com/streadway/amqp" "log" ) //amqp:// 賬號 密碼@地址:端口號/vhost const MQURL = "amqp://ziyi:ziyi@10.253.50.145:5672/ziyi" type RabbitMQ struct { //連接 conn *amqp.Connection //管道 channel *amqp.Channel //隊列名稱 QueueName string //交換機 Exchange string //key Simple模式 幾乎用不到 Key string //連接信息 Mqurl string } //創(chuàng)建RabbitMQ結構體實例 func NewRabbitMQ(queuename string, exchange string, key string) *RabbitMQ { rabbitmq := &RabbitMQ{QueueName: queuename, Exchange: exchange, Key: key, Mqurl: MQURL} var err error //創(chuàng)建rabbitmq連接 rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "創(chuàng)建連接錯誤!") rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "獲取channel失敗") return rabbitmq } //斷開channel和connection func (r *RabbitMQ) Destory() { r.channel.Close() r.conn.Close() } //錯誤處理函數(shù) func (r *RabbitMQ) failOnErr(err error, message string) { if err != nil { log.Fatalf("%s:%s", message, err) panic(fmt.Sprintf("%s:%s", message, err)) } } //簡單模式step:1。創(chuàng)建簡單模式下RabbitMQ實例 func NewRabbitMQSimple(queueName string) *RabbitMQ { return NewRabbitMQ(queueName, "", "") } //訂閱模式創(chuàng)建rabbitmq實例 func NewRabbitMQPubSub(exchangeName string) *RabbitMQ { //創(chuàng)建rabbitmq實例 rabbitmq := NewRabbitMQ("", exchangeName, "") var err error //獲取connection rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "failed to connecct rabbitmq!") //獲取channel rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "failed to open a channel!") return rabbitmq } //訂閱模式生成 func (r *RabbitMQ) PublishPub(message string) { //嘗試創(chuàng)建交換機,不存在創(chuàng)建 err := r.channel.ExchangeDeclare( //交換機名稱 r.Exchange, //交換機類型 廣播類型 "fanout", //是否持久化 true, //是否字段刪除 false, //true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務器的響應 false, nil, ) r.failOnErr(err, "failed to declare an excha"+"nge") //2 發(fā)送消息 err = r.channel.Publish( r.Exchange, "", false, false, amqp.Publishing{ //類型 ContentType: "text/plain", //消息 Body: []byte(message), }) } //訂閱模式消費端代碼 func (r *RabbitMQ) RecieveSub() { //嘗試創(chuàng)建交換機,不存在創(chuàng)建 err := r.channel.ExchangeDeclare( //交換機名稱 r.Exchange, //交換機類型 廣播類型 "fanout", //是否持久化 true, //是否字段刪除 false, //true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務器的響應 false, nil, ) r.failOnErr(err, "failed to declare an excha"+"nge") //2試探性創(chuàng)建隊列,創(chuàng)建隊列 q, err := r.channel.QueueDeclare( "", //隨機生產(chǎn)隊列名稱 false, false, true, false, nil, ) r.failOnErr(err, "Failed to declare a queue") //綁定隊列到exchange中 err = r.channel.QueueBind( q.Name, //在pub/sub模式下,這里的key要為空 "", r.Exchange, false, nil, ) //消費消息 message, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) forever := make(chan bool) go func() { for d := range message { log.Printf("Received a message:%s,", d.Body) } }() fmt.Println("退出請按 Ctrl+C") <-forever } //話題模式 創(chuàng)建RabbitMQ實例 func NewRabbitMQTopic(exchagne string, routingKey string) *RabbitMQ { //創(chuàng)建rabbitmq實例 rabbitmq := NewRabbitMQ("", exchagne, routingKey) var err error rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "failed to connect rabbingmq!") rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "failed to open a channel") return rabbitmq } //話題模式發(fā)送信息 func (r *RabbitMQ) PublishTopic(message string) { //嘗試創(chuàng)建交換機,不存在創(chuàng)建 err := r.channel.ExchangeDeclare( //交換機名稱 r.Exchange, //交換機類型 話題模式 "topic", //是否持久化 true, //是否字段刪除 false, //true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務器的響應 false, nil, ) r.failOnErr(err, "topic failed to declare an excha"+"nge") //2發(fā)送信息 err = r.channel.Publish( r.Exchange, //要設置 r.Key, false, false, amqp.Publishing{ //類型 ContentType: "text/plain", //消息 Body: []byte(message), }) } //話題模式接收信息 //要注意key //其中* 用于匹配一個單詞,#用于匹配多個單詞(可以是零個) //匹配 表示匹配imooc.* 表示匹配imooc.hello,但是imooc.hello.one需要用imooc.#才能匹配到 func (r *RabbitMQ) RecieveTopic() { //嘗試創(chuàng)建交換機,不存在創(chuàng)建 err := r.channel.ExchangeDeclare( //交換機名稱 r.Exchange, //交換機類型 話題模式 "topic", //是否持久化 true, //是否字段刪除 false, //true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務器的響應 false, nil, ) r.failOnErr(err, "failed to declare an exchange") //2試探性創(chuàng)建隊列,創(chuàng)建隊列 q, err := r.channel.QueueDeclare( "", //隨機生產(chǎn)隊列名稱 false, false, true, false, nil, ) r.failOnErr(err, "Failed to declare a queue") //綁定隊列到exchange中 err = r.channel.QueueBind( q.Name, //在pub/sub模式下,這里的key要為空 r.Key, r.Exchange, false, nil, ) //消費消息 message, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) forever := make(chan bool) go func() { for d := range message { log.Printf("Received a message:%s,", d.Body) } }() fmt.Println("退出請按 Ctrl+C") <-forever } //路由模式 創(chuàng)建RabbitMQ實例 func NewRabbitMQRouting(exchagne string, routingKey string) *RabbitMQ { //創(chuàng)建rabbitmq實例 rabbitmq := NewRabbitMQ("", exchagne, routingKey) var err error rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "failed to connect rabbingmq!") rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "failed to open a channel") return rabbitmq } //路由模式發(fā)送信息 func (r *RabbitMQ) PublishRouting(message string) { //嘗試創(chuàng)建交換機,不存在創(chuàng)建 err := r.channel.ExchangeDeclare( //交換機名稱 r.Exchange, //交換機類型 廣播類型 "direct", //是否持久化 true, //是否字段刪除 false, //true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務器的響應 false, nil, ) r.failOnErr(err, "failed to declare an excha"+"nge") //發(fā)送信息 err = r.channel.Publish( r.Exchange, //要設置 r.Key, false, false, amqp.Publishing{ //類型 ContentType: "text/plain", //消息 Body: []byte(message), }) } //路由模式接收信息 func (r *RabbitMQ) RecieveRouting() { //嘗試創(chuàng)建交換機,不存在創(chuàng)建 err := r.channel.ExchangeDeclare( //交換機名稱 r.Exchange, //交換機類型 廣播類型 "direct", //是否持久化 true, //是否字段刪除 false, //true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定 false, //是否阻塞 true表示要等待服務器的響應 false, nil, ) r.failOnErr(err, "failed to declare an excha"+"nge") //2試探性創(chuàng)建隊列,創(chuàng)建隊列 q, err := r.channel.QueueDeclare( "", //隨機生產(chǎn)隊列名稱 false, false, true, false, nil, ) r.failOnErr(err, "Failed to declare a queue") //綁定隊列到exchange中 err = r.channel.QueueBind( q.Name, //在pub/sub模式下,這里的key要為空 r.Key, r.Exchange, false, nil, ) //消費消息 message, err := r.channel.Consume( q.Name, "", true, false, false, false, nil, ) forever := make(chan bool) go func() { for d := range message { log.Printf("Received a message:%s,", d.Body) } }() fmt.Println("退出請按 Ctrl+C") <-forever } //簡單模式Step:2、簡單模式下生產(chǎn)代碼 func (r *RabbitMQ) PublishSimple(message string) { //1、申請隊列,如果隊列存在就跳過,不存在創(chuàng)建 //優(yōu)點:保證隊列存在,消息能發(fā)送到隊列中 _, err := r.channel.QueueDeclare( //隊列名稱 r.QueueName, //是否持久化 false, //是否為自動刪除 當最后一個消費者斷開連接之后,是否把消息從隊列中刪除 false, //是否具有排他性 true表示自己可見 其他用戶不能訪問 false, //是否阻塞 true表示要等待服務器的響應 false, //額外數(shù)據(jù) nil, ) if err != nil { fmt.Println(err) } //2.發(fā)送消息到隊列中 r.channel.Publish( //默認的Exchange交換機是default,類型是direct直接類型 r.Exchange, //要賦值的隊列名稱 r.QueueName, //如果為true,根據(jù)exchange類型和routkey規(guī)則,如果無法找到符合條件的隊列那么會把發(fā)送的消息返回給發(fā)送者 false, //如果為true,當exchange發(fā)送消息到隊列后發(fā)現(xiàn)隊列上沒有綁定消費者,則會把消息還給發(fā)送者 false, //消息 amqp.Publishing{ //類型 ContentType: "text/plain", //消息 Body: []byte(message), }) } func (r *RabbitMQ) ConsumeSimple() { //1、申請隊列,如果隊列存在就跳過,不存在創(chuàng)建 //優(yōu)點:保證隊列存在,消息能發(fā)送到隊列中 _, err := r.channel.QueueDeclare( //隊列名稱 r.QueueName, //是否持久化 false, //是否為自動刪除 當最后一個消費者斷開連接之后,是否把消息從隊列中刪除 false, //是否具有排他性 false, //是否阻塞 false, //額外數(shù)據(jù) nil, ) if err != nil { fmt.Println(err) } //接收消息 msgs, err := r.channel.Consume( r.QueueName, //用來區(qū)分多個消費者 "", //是否自動應答 true, //是否具有排他性 false, //如果設置為true,表示不能同一個connection中發(fā)送的消息傳遞給這個connection中的消費者 false, //隊列是否阻塞 false, nil, ) if err != nil { fmt.Println(err) } forever := make(chan bool) //啟用協(xié)程處理 go func() { for d := range msgs { //實現(xiàn)我們要處理的邏輯函數(shù) log.Printf("Received a message:%s", d.Body) //fmt.Println(d.Body) } }() log.Printf("【*】warting for messages, To exit press CCTRAL+C") <-forever } func (r *RabbitMQ) ConsumeWorker(consumerName string) { //1、申請隊列,如果隊列存在就跳過,不存在創(chuàng)建 //優(yōu)點:保證隊列存在,消息能發(fā)送到隊列中 _, err := r.channel.QueueDeclare( //隊列名稱 r.QueueName, //是否持久化 false, //是否為自動刪除 當最后一個消費者斷開連接之后,是否把消息從隊列中刪除 false, //是否具有排他性 false, //是否阻塞 false, //額外數(shù)據(jù) nil, ) if err != nil { fmt.Println(err) } //接收消息 msgs, err := r.channel.Consume( r.QueueName, //用來區(qū)分多個消費者 consumerName, //是否自動應答 true, //是否具有排他性 false, //如果設置為true,表示不能同一個connection中發(fā)送的消息傳遞給這個connection中的消費者 false, //隊列是否阻塞 false, nil, ) if err != nil { fmt.Println(err) } forever := make(chan bool) //啟用協(xié)程處理 go func() { for d := range msgs { //實現(xiàn)我們要處理的邏輯函數(shù) log.Printf("%s Received a message:%s", consumerName, d.Body) //fmt.Println(d.Body) } }() log.Printf("【*】warting for messages, To exit press CCTRAL+C") <-forever }
②測試代碼
1. simple簡單模式
consumer.go
func main() { //消費者 rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple") rabbitmq.ConsumeSimple() }
producer.go
func main() { //Simple模式 生產(chǎn)者 rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple") for i := 0; i < 5; i++ { time.Sleep(time.Second * 2) rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i)) } }
2. worker模式
consumer.go
func main() { /* worker模式無非就是多個消費者去同一個隊列中消費消息 */ //消費者1 rabbitmq1 := RabbitMQ.NewRabbitMQSimple("ziyiWorker") go rabbitmq1.ConsumeWorker("consumer1") //消費者2 rabbitmq2 := RabbitMQ.NewRabbitMQSimple("ziyiWorker") rabbitmq2.ConsumeWorker("consumer2") }
producer.go
func main() { //Worker模式 生產(chǎn)者 rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiWorker") for i := 0; i < 100; i++ { //time.Sleep(time.Second * 2) rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i)) } }
3. publish/subscribe模式
consumer.go:
func main() { //消費者 rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct") rabbitmq.RecieveSub() }
producer.go
func main() { //訂閱模式發(fā)送者 rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct") for i := 0; i <= 20; i++ { rabbitmq.PublishPub("訂閱模式生產(chǎn)第" + strconv.Itoa(i) + "條數(shù)據(jù)") fmt.Println(i) time.Sleep(1 * time.Second) } }
4. router模式
consumer.go
func main() { //消費者 rabbitmq := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one") rabbitmq.RecieveRouting() }
producer.go
func main() { //路由模式生產(chǎn)者 imoocOne := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one") imoocTwo := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_two") for i := 0; i <= 10; i++ { imoocOne.PublishRouting("hello imooc one!" + strconv.Itoa(i)) imoocTwo.PublishRouting("hello imooc two!" + strconv.Itoa(i)) time.Sleep(1 * time.Second) fmt.Println(i) } }
5. topic模式
consumer.go
func main() { /* 星號井號代表通配符 星號代表多個單詞,井號代表一個單詞 路由功能添加模糊匹配 消息產(chǎn)生者產(chǎn)生消息,把消息交給交換機 交換機根據(jù)key的規(guī)則模糊匹配到對應的隊列,由隊列的監(jiān)聽消費者接收消息消費 */ //Topic消費者 //rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "#") //匹配所有的key:topic88和topic99 rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three") //只匹配topic88的 rabbitmq.RecieveTopic() }
producer.go
func main() { //Topic模式生產(chǎn)者 imoocOne := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three") imoocTwo := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic99.four") for i := 0; i <= 10; i++ { imoocOne.PublishTopic("hello imooc topic three!" + strconv.Itoa(i)) imoocTwo.PublishTopic("hello imooc topic four!" + strconv.Itoa(i)) time.Sleep(1 * time.Second) fmt.Println(i) } }
2 Kafka
2.1 基本概念
Kafka是分布式的,其所有的構件borker(server服務端集群)、producer(消息生產(chǎn))、consumer(消息消費者)都可以是分布式的。
producer給broker發(fā)送數(shù)據(jù),這些消息會存到kafka server里,然后consumer再向kafka server發(fā)起請求去消費這些數(shù)據(jù)。
kafka server在這個過程中像是一個幫你保管數(shù)據(jù)的中間商。所以kafka服務器也可以叫做broker(broker直接翻譯可以是中間人或者經(jīng)紀人的意思)。
在消息的生產(chǎn)時可以使用一個標識topic來區(qū)分,且可以進行分區(qū);每一個分區(qū)都是一個順序的、不可變的消息隊列, 并且可以持續(xù)的添加。
同時為發(fā)布和訂閱提供高吞吐量。據(jù)了解,Kafka每秒可以生產(chǎn)約25萬消息(50 MB),每秒處理55萬消息(110 MB)。
消息被處理的狀態(tài)是在consumer端維護,而不是由server端維護。當失敗時能自動平衡
- 應用場景
- 監(jiān)控
- 消息隊列
- 流處理
- 日志聚合
- 持久性日志
- 基礎概念
- topic:話題
- broker:kafka服務集群,已發(fā)布的消息保存在一組服務器中,稱之為kafka集群。集群中的每一個服務器都是一個代理(broker)
- partition:分區(qū),topic物理上的分組
- message:消息,每個producer可以向一個topic主題發(fā)布一些消息
1.生產(chǎn)者從Kafka集群獲取分區(qū)leader信息
2.生產(chǎn)者將消息發(fā)送給leader
3.leader將消息寫入本地磁盤
4.follower從leader拉取消息數(shù)據(jù)
5.follower將消息寫入本地磁盤后向leader發(fā)送ACK
6.leader收到所有的follower的ACK之后向生產(chǎn)者發(fā)送ACK
2.2 常見模式
①點對點模式:火車站出租車搶客
發(fā)送者將消息發(fā)送到消息隊列中,消費者去消費,如果消費者有多個,他們會競爭地消費,也就是說對于某一條消息,只有一個消費者能“搶“到它。類似于火車站門口的出租車搶客的場景。
②發(fā)布訂閱模式:組間無競爭,組內有競爭
消費者訂閱對應的topic(主題),只有訂閱了對應topic消費者的才會接收到消息。
例如:
- 牛奶有很多種,光明牛奶,希望牛奶等,只有你訂閱了光明牛奶,送奶工才會把光明牛奶送到對應位置,你也才會有機會消費這個牛奶
注意
:為了提高消費者的消費能力,kafka中引入了消費者組的概念。相當于是:不同消費者組之間因為訂閱的topic不同,不會有競爭關系。但是消費者組內是有競爭關系。
例如:
- 成都、廈門的出租車司機分別組成各自的消費者組。
- 成都的出租車司機只拉成都的人,廈門的只拉廈門的人。(因此他們兩個消費者組不是競爭關系)
- 成都市內的出租車司機之間是競爭關系。(消費者組內是競爭關系)
2.3 docker-compose部署
vim docker-compose.yml
version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:6.2.0 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka:6.2.0 ports: - "9092:9092" environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 #KAFKA_ADVERTISED_LISTENERS后面改為自己本地宿主機的ip,例如我本地mac的ip為192.168.0.101 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.101:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 depends_on: - zookeeper
# 進入到docker-compose.yml所在目錄,執(zhí)行下面命令 docker-compose up -d # 查看部署結果,狀態(tài)為up表明部署成功 docker-compose ps
2.4 代碼操作
# 1. 創(chuàng)建對應topic docker-compose exec kafka kafka-topics --create --topic test-topic --partitions 1 --replication-factor 1 --bootstrap-server 192.168.0.101:9092 # 2. 查看topic列表 docker-compose exec kafka kafka-topics --list --zookeeper zookeeper:2181
①producer.go
package main import ( "fmt" "github.com/IBM/sarama" ) // 基于sarama第三方庫開發(fā)的kafka client func main() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 發(fā)送完數(shù)據(jù)需要leader和follow都確認 config.Producer.Partitioner = sarama.NewRandomPartitioner // 新選出一個partition config.Producer.Return.Successes = true // 成功交付的消息將在success channel返回 // 構造一個消息 msg := &sarama.ProducerMessage{} msg.Topic = "web_log" msg.Value = sarama.StringEncoder("this is a test log") // 連接kafka client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { fmt.Println("producer closed, err:", err) return } defer client.Close() // 發(fā)送消息 pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send msg failed, err:", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) }
②consumer.go
package main import ( "fmt" "github.com/IBM/sarama" ) // kafka consumer func main() { consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { fmt.Printf("fail to start consumer, err:%v\n", err) return } partitionList, err := consumer.Partitions("web_log") // 根據(jù)topic取到所有的分區(qū) if err != nil { fmt.Printf("fail to get list of partition:err%v\n", err) return } fmt.Println(partitionList) for partition := range partitionList { // 遍歷所有的分區(qū) // 針對每個分區(qū)創(chuàng)建一個對應的分區(qū)消費者 pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err) return } defer pc.AsyncClose() // 異步從每個分區(qū)消費信息 go func(sarama.PartitionConsumer) { for msg := range pc.Messages() { fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, string(msg.Value)) } }(pc) } //演示時使用 select {} }
③運行效果
到此這篇關于Go操作各大消息隊列教程(RabbitMQ、Kafka)的文章就介紹到這了,更多相關Go消息隊列內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Go 如何基于IP限制HTTP訪問頻率的方法實現(xiàn)
這篇文章主要介紹了Go 如何基于IP限制HTTP訪問頻率的方法實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-11-11