RabbitMQ延時消息隊列在golang中的使用詳解
延時隊列常使用在某些業(yè)務(wù)場景,例如訂單支付超時、接收到外賣后自動確認完成訂單、定時任務(wù)、促銷過期等,使用延時隊列可以簡化系統(tǒng)的設(shè)計和開發(fā)、提高系統(tǒng)的可靠性和可用性、提高系統(tǒng)的性能。下面介紹使用RabbitMQ的延時消息隊列,使用之前先要讓RabbitMQ支持延時隊列。
在docker安裝單機版rabbitMQ
docker-compose.yaml配置文件內(nèi)容如下:
version: '3' services: rabbitmq: image: rabbitmq:3.12-management container_name: rabbitmq hostname: rabbitmq-service restart: always ports: - 5672:5672 - 15672:15672 volumes: - $PWD/data:/var/lib/rabbitmq - $PWD/plugins/enabled_plugins:/etc/rabbitmq/enabled_plugins - $PWD/plugins/rabbitmq_delayed_message_exchange-3.12.0.ez:/plugins/rabbitmq_delayed_message_exchange-3.12.0.ez environment: TZ: Asia/Shanghai RABBITMQ_DEFAULT_USER: guest RABBITMQ_DEFAULT_PASS: guest RABBITMQ_DEFAULT_VHOST: /
rabbitMQ默認不支持延時消息隊列類型,需要另外安裝插件來實現(xiàn):
- enabled_plugins 是設(shè)置默認開啟的插件,內(nèi)容為
[rabbitmq_delayed_message_exchange,rabbitmq_management,rabbitmq_prometheus]
- rabbitmq_delayed_message_exchange-3.12.0.ez 是延時隊列插件。
啟動rabbitmq:
docker-compose up -d
可以在瀏覽器訪問管理后臺 http://localhost:15672 ,用戶名和密碼都是guest
。
點擊菜單【exchange】--> 【Add a new exchange】-->【Type】,在下拉列表中看到x-delayed-message
類型的話,說明已經(jīng)支持延時隊列了。
使用延時隊列需要指定具體某一種消息類型(direct、topic、fanout、headers),下面以direct類型的延時消息隊列為例。
生產(chǎn)端示例代碼
package main import ( "context" "fmt" "strconv" "time" amqp "github.com/rabbitmq/amqp091-go" ) var ( url = "amqp://guest:guest@127.0.0.1:5672/" exchangeName = "delayed-message-exchange-demo" ) func main() { conn, err := amqp.Dial(url) checkErr(err) defer conn.Close() ctx := context.Background() queueName := "delayed-message-queue" routingKey := "delayed-key" delayedMessageType := "direct" exchange := NewDelayedMessageExchange(exchangeName, delayedMessageType, routingKey) q, err := NewProducer(queueName, conn, exchange) checkErr(err) defer q.Close() for i := 1; i <= 5; i++ { body := time.Now().Format("2006-01-02 15:04:05.000") + " hello world " + strconv.Itoa(i) err = q.Publish(ctx, time.Second*5, []byte(body)) // 發(fā)送消息 checkErr(err) time.Sleep(time.Second) } } // Exchange 交換機 type Exchange struct { Name string // exchange名稱 Type string // exchange類型,支持direct、topic、fanout、headers、x-delayed-message RoutingKey string // 路由key XDelayedMessageType string // 延時消息類型,支持direct、topic、fanout、headers } // NewDelayedMessageExchange 實例化一個delayed-message類型交換機,參數(shù)delayedMessageType 消息類型direct、topic、fanout、headers func NewDelayedMessageExchange(exchangeName string, delayedMessageType string, routingKey string) *Exchange { return &Exchange{ Name: exchangeName, Type: "x-delayed-message", RoutingKey: routingKey, XDelayedMessageType: delayedMessageType, } } // Producer 生產(chǎn)者對象 type Producer struct { queueName string exchange *Exchange conn *amqp.Connection ch *amqp.Channel } // NewProducer 實例化一個生產(chǎn)者 func NewProducer(queueName string, conn *amqp.Connection, exchange *Exchange) (*Producer, error) { // 創(chuàng)建管道 ch, err := conn.Channel() if err != nil { return nil, err } // 聲明交換機類型 err = ch.ExchangeDeclare( exchange.Name, // 交換機名稱 exchange.Type, // x-delayed-message true, // 是否持久化 false, // 是否自動刪除 false, // 是否公開,false即公開 false, // 是否等待 amqp.Table{ "x-delayed-type": exchange.XDelayedMessageType, // 延時消息的類型direct、topic、fanout、headers }, ) if err != nil { _ = ch.Close() return nil, err } // 聲明隊列,如果隊列不存在則自動創(chuàng)建,存在則跳過創(chuàng)建 q, err := ch.QueueDeclare( queueName, // 消息隊列名稱 true, // 是否持久化 false, // 是否自動刪除 false, // 是否具有排他性(僅創(chuàng)建它的程序才可用) false, // 是否阻塞處理 nil, // 額外的屬性 ) if err != nil { _ = ch.Close() return nil, err } // 綁定隊列和交換機 err = ch.QueueBind( q.Name, exchange.RoutingKey, exchange.Name, false, nil, ) if err != nil { _ = ch.Close() return nil, err } return &Producer{ queueName: queueName, conn: conn, ch: ch, exchange: exchange, }, nil } // Publish 發(fā)送消息 func (p *Producer) Publish(ctx context.Context, delayTime time.Duration, body []byte) error { err := p.ch.PublishWithContext( ctx, p.exchange.Name, // exchange name p.exchange.RoutingKey, // key false, // mandatory 如果為true,根據(jù)自身exchange類型和routingKey規(guī)則無法找到符合條件的隊列會把消息返還給發(fā)送者 false, // immediate 如果為true,當exchange發(fā)送消息到隊列后發(fā)現(xiàn)隊列上沒有消費者,則會把消息返還給發(fā)送者 amqp.Publishing{ DeliveryMode: amqp.Persistent, // 如果隊列的聲明是持久化的,那么消息也設(shè)置為持久化 ContentType: "text/plain", Body: body, Headers: amqp.Table{ "x-delay": int(delayTime / time.Millisecond), // 延遲時間: 毫秒 }, }, ) if err != nil { return err } fmt.Printf("[send]: %s\n", body) return nil } // Close 關(guān)閉生產(chǎn)者 func (p *Producer) Close() { if p.ch != nil { _ = p.ch.Close() } } func checkErr(err error) { if err != nil { panic(err) } }
消費端示例代碼
package main import ( "context" "fmt" "os" "os/signal" "syscall" "time" amqp "github.com/rabbitmq/amqp091-go" ) var ( url = "amqp://guest:guest@127.0.0.1:5672/" exchangeName = "delayed-message-exchange-demo" ) func main() { conn, err := amqp.Dial(url) checkErr(err) defer conn.Close() ctx := context.Background() queueName := "delayed-message-queue" routingKey := "delayed-key" delayedMessageType := "direct" exchange := NewDelayedMessageExchange(exchangeName, delayedMessageType, routingKey) c, err := NewConsumer(ctx, queueName, exchange, conn) checkErr(err) c.Consume() // 消費消息 defer c.Close() fmt.Println("exit press CTRL+C") interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) <-interrupt fmt.Println("exit consume messages") } // Exchange 交換機 type Exchange struct { Name string // exchange名稱 Type string // exchange類型,支持direct、topic、fanout、headers、x-delayed-message RoutingKey string // 路由key XDelayedMessageType string // 延時消息類型,支持direct、topic、fanout、headers } // NewDelayedMessageExchange 實例化一個delayed-message類型交換機,參數(shù)delayedMessageType 消息類型direct、topic、fanout、headers func NewDelayedMessageExchange(exchangeName string, delayedMessageType string, routingKey string) *Exchange { return &Exchange{ Name: exchangeName, Type: "x-delayed-message", RoutingKey: routingKey, XDelayedMessageType: delayedMessageType, } } // Consumer 消費者 type Consumer struct { ctx context.Context queueName string conn *amqp.Connection ch *amqp.Channel delivery <-chan amqp.Delivery exchange *Exchange } // NewConsumer 實例化一個消費者 func NewConsumer(ctx context.Context, queueName string, exchange *Exchange, conn *amqp.Connection) (*Consumer, error) { // 創(chuàng)建管道 ch, err := conn.Channel() if err != nil { return nil, err } // 聲明交換機類型 err = ch.ExchangeDeclare( exchange.Name, // 交換機名稱 exchange.Type, // 交換機的類型,支持direct、topic、fanout、headers true, // 是否持久化 false, // 是否自動刪除 false, // 是否公開,false即公開 false, // 是否等待 amqp.Table{ "x-delayed-type": exchange.XDelayedMessageType, // 延時消息的類型direct、topic、fanout、headers }, ) if err != nil { _ = ch.Close() return nil, err } // 聲明隊列,如果隊列不存在則自動創(chuàng)建,存在則跳過創(chuàng)建 q, err := ch.QueueDeclare( queueName, // 消息隊列名稱 true, // 是否持久化 false, // 是否自動刪除 false, // 是否具有排他性(僅創(chuàng)建它的程序才可用) false, // 是否阻塞處理 nil, // 額外的屬性 ) if err != nil { _ = ch.Close() return nil, err } // 綁定隊列和交換機 err = ch.QueueBind( q.Name, exchange.RoutingKey, exchange.Name, false, nil, ) if err != nil { _ = ch.Close() return nil, err } // 為消息隊列注冊消費者 delivery, err := ch.ConsumeWithContext( ctx, queueName, // queue 名稱 "", // consumer 用來區(qū)分多個消費者 true, // auto-ack 是否自動應(yīng)答 false, // exclusive 是否獨有 false, // no-local 如果設(shè)置為true,表示不能將同一個Connection中生產(chǎn)者發(fā)送的消息傳遞給這個Connection中的消費者 false, // no-wait 是否阻塞 nil, // args ) if err != nil { _ = ch.Close() return nil, err } return &Consumer{ queueName: queueName, conn: conn, ch: ch, delivery: delivery, exchange: exchange, }, nil } // Consume 接收消息 func (c *Consumer) Consume() { go func() { fmt.Printf("waiting for messages, type=%s, queue=%s, key=%s\n", c.exchange.Type, c.queueName, c.exchange.RoutingKey) for d := range c.delivery { // 處理消息 fmt.Printf("%s %s [received]: %s\n", time.Now().Format("2006-01-02 15:04:05.000"), c.exchange.RoutingKey, d.Body) // _ = d.Ack(false) // 如果auto-ack為false時,需要手動ack } }() } // Close 關(guān)閉 func (c *Consumer) Close() { if c.ch != nil { _ = c.ch.Close() } } func checkErr(err error) { if err != nil { panic(err) } }
總結(jié)
上面介紹了rabbitMQ延時消息隊列簡單使用示例,在實際使用中,連接rabbitMQ應(yīng)該有網(wǎng)絡(luò)斷開重連功能。
rabbitMQ需要依賴插件rabbitmq_delayed_message_exchange,目前該插件的當前設(shè)計并不真正適合包含大量延遲消息(例如數(shù)十萬以上)的場景,另外該插件的一個可變性來源是依賴于 Erlang 計時器,在系統(tǒng)中使用了一定數(shù)量的長時間計時器之后,它們開始爭用調(diào)度程序資源,并且時間漂移不斷累積。
如果你采用了 Delayed Message 插件這種方式來實現(xiàn),對于消息可靠性要求非常高,在發(fā)送消息之前可以先保存到 DB 打標記,消費之后將消息標記為已消費,中間可以加入定時任務(wù)做檢測,這可以進一步保證你的消息的可靠性。
這是在github.com/rabbitmq/amqp091-go
基礎(chǔ)上封裝的 rabbitmq 庫,開箱即用各種消息類型(direct
, topic
, fanout
, headers
, delayed message
, publisher subscriber
)。
到此這篇關(guān)于RabbitMQ延時消息隊列在golang中的使用詳解的文章就介紹到這了,更多相關(guān)go RabbitMQ延時隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go Excelize API源碼閱讀GetPageLayout及SetPageMargins
這篇文章主要為大家介紹了Go Excelize API源碼閱讀GetPageLayout及SetPageMargins的方法示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-08-08Go空結(jié)構(gòu)體struct{}的作用是什么
本文主要介紹了Go空結(jié)構(gòu)體struct{}的作用是什么,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-02-02Go語言實現(xiàn)互斥鎖、隨機數(shù)、time、List
這篇文章主要介紹了Go語言實現(xiàn)互斥鎖、隨機數(shù)、time、List的相關(guān)資料,需要的朋友可以參考下2018-10-10