亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

RabbitMQ延時消息隊列在golang中的使用詳解

 更新時間:2023年11月01日 08:18:52   作者:zhuyasen  
延時隊列常使用在某些業(yè)務(wù)場景,使用延時隊列可以簡化系統(tǒng)的設(shè)計和開發(fā)、提高系統(tǒng)的可靠性和可用性、提高系統(tǒng)的性能,下面我們就來看看如何在golang中使用RabbitMQ的延時消息隊列吧

延時隊列常使用在某些業(yè)務(wù)場景,例如訂單支付超時、接收到外賣后自動確認完成訂單、定時任務(wù)、促銷過期等,使用延時隊列可以簡化系統(tǒng)的設(shè)計和開發(fā)、提高系統(tǒng)的可靠性和可用性提高系統(tǒng)的性能。下面介紹使用RabbitMQ的延時消息隊列,使用之前先要讓RabbitMQ支持延時隊列。

在docker安裝單機版rabbitMQ

docker-compose.yaml配置文件內(nèi)容如下:

version: '3'
 
services:
  rabbitmq:
    image: rabbitmq:3.12-management
    container_name: rabbitmq
    hostname: rabbitmq-service
    restart: always
    ports:
      - 5672:5672
      - 15672:15672
    volumes:
      - $PWD/data:/var/lib/rabbitmq
      - $PWD/plugins/enabled_plugins:/etc/rabbitmq/enabled_plugins
      - $PWD/plugins/rabbitmq_delayed_message_exchange-3.12.0.ez:/plugins/rabbitmq_delayed_message_exchange-3.12.0.ez
    environment:
      TZ: Asia/Shanghai
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
      RABBITMQ_DEFAULT_VHOST: /

rabbitMQ默認不支持延時消息隊列類型,需要另外安裝插件來實現(xiàn):

  • enabled_plugins 是設(shè)置默認開啟的插件,內(nèi)容為 [rabbitmq_delayed_message_exchange,rabbitmq_management,rabbitmq_prometheus]
  • rabbitmq_delayed_message_exchange-3.12.0.ez 是延時隊列插件。

啟動rabbitmq:

docker-compose up -d

可以在瀏覽器訪問管理后臺 http://localhost:15672 ,用戶名和密碼都是guest。

點擊菜單【exchange】--> 【Add a new exchange】-->【Type】,在下拉列表中看到x-delayed-message類型的話,說明已經(jīng)支持延時隊列了。

使用延時隊列需要指定具體某一種消息類型(direct、topic、fanout、headers),下面以direct類型的延時消息隊列為例。

生產(chǎn)端示例代碼

package main

import (
	"context"
	"fmt"
	"strconv"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

var (
	url          = "amqp://guest:guest@127.0.0.1:5672/"
	exchangeName = "delayed-message-exchange-demo"
)

func main() {
	conn, err := amqp.Dial(url)
	checkErr(err)
	defer conn.Close()

	ctx := context.Background()

	queueName := "delayed-message-queue"
	routingKey := "delayed-key"
	delayedMessageType := "direct"
	exchange := NewDelayedMessageExchange(exchangeName, delayedMessageType, routingKey)
	q, err := NewProducer(queueName, conn, exchange)
	checkErr(err)
	defer q.Close()
	for i := 1; i <= 5; i++ {
		body := time.Now().Format("2006-01-02 15:04:05.000") + " hello world " + strconv.Itoa(i)
		err = q.Publish(ctx, time.Second*5, []byte(body)) // 發(fā)送消息
		checkErr(err)
		time.Sleep(time.Second)
	}
}

// Exchange 交換機
type Exchange struct {
	Name                string // exchange名稱
	Type                string // exchange類型,支持direct、topic、fanout、headers、x-delayed-message
	RoutingKey          string // 路由key
	XDelayedMessageType string // 延時消息類型,支持direct、topic、fanout、headers
}

// NewDelayedMessageExchange 實例化一個delayed-message類型交換機,參數(shù)delayedMessageType 消息類型direct、topic、fanout、headers
func NewDelayedMessageExchange(exchangeName string, delayedMessageType string, routingKey string) *Exchange {
	return &Exchange{
		Name:                exchangeName,
		Type:                "x-delayed-message",
		RoutingKey:          routingKey,
		XDelayedMessageType: delayedMessageType,
	}
}

// Producer 生產(chǎn)者對象
type Producer struct {
	queueName string
	exchange  *Exchange
	conn      *amqp.Connection
	ch        *amqp.Channel
}

// NewProducer 實例化一個生產(chǎn)者
func NewProducer(queueName string, conn *amqp.Connection, exchange *Exchange) (*Producer, error) {
	// 創(chuàng)建管道
	ch, err := conn.Channel()
	if err != nil {
		return nil, err
	}

	// 聲明交換機類型
	err = ch.ExchangeDeclare(
		exchange.Name, // 交換機名稱
		exchange.Type, //  x-delayed-message
		true,          // 是否持久化
		false,         // 是否自動刪除
		false,         // 是否公開,false即公開
		false,         // 是否等待
		amqp.Table{
			"x-delayed-type": exchange.XDelayedMessageType, // 延時消息的類型direct、topic、fanout、headers
		},
	)
	if err != nil {
		_ = ch.Close()
		return nil, err
	}

	// 聲明隊列,如果隊列不存在則自動創(chuàng)建,存在則跳過創(chuàng)建
	q, err := ch.QueueDeclare(
		queueName, // 消息隊列名稱
		true,      // 是否持久化
		false,     // 是否自動刪除
		false,     // 是否具有排他性(僅創(chuàng)建它的程序才可用)
		false,     // 是否阻塞處理
		nil,       // 額外的屬性
	)
	if err != nil {
		_ = ch.Close()
		return nil, err
	}

	// 綁定隊列和交換機
	err = ch.QueueBind(
		q.Name,
		exchange.RoutingKey,
		exchange.Name,
		false,
		nil,
	)
	if err != nil {
		_ = ch.Close()
		return nil, err
	}

	return &Producer{
		queueName: queueName,
		conn:      conn,
		ch:        ch,
		exchange:  exchange,
	}, nil
}

// Publish 發(fā)送消息
func (p *Producer) Publish(ctx context.Context, delayTime time.Duration, body []byte) error {
	err := p.ch.PublishWithContext(
		ctx,
		p.exchange.Name,       // exchange name
		p.exchange.RoutingKey, // key
		false,                 // mandatory 如果為true,根據(jù)自身exchange類型和routingKey規(guī)則無法找到符合條件的隊列會把消息返還給發(fā)送者
		false,                 // immediate 如果為true,當exchange發(fā)送消息到隊列后發(fā)現(xiàn)隊列上沒有消費者,則會把消息返還給發(fā)送者
		amqp.Publishing{
			DeliveryMode: amqp.Persistent, // 如果隊列的聲明是持久化的,那么消息也設(shè)置為持久化
			ContentType:  "text/plain",
			Body:         body,
			Headers: amqp.Table{
				"x-delay": int(delayTime / time.Millisecond), // 延遲時間: 毫秒
			},
		},
	)
	if err != nil {
		return err
	}
	fmt.Printf("[send]: %s\n", body)
	return nil
}

// Close 關(guān)閉生產(chǎn)者
func (p *Producer) Close() {
	if p.ch != nil {
		_ = p.ch.Close()
	}
}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

消費端示例代碼

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

var (
	url          = "amqp://guest:guest@127.0.0.1:5672/"
	exchangeName = "delayed-message-exchange-demo"
)

func main() {
	conn, err := amqp.Dial(url)
	checkErr(err)
	defer conn.Close()

	ctx := context.Background()

	queueName := "delayed-message-queue"
	routingKey := "delayed-key"
	delayedMessageType := "direct"
	exchange := NewDelayedMessageExchange(exchangeName, delayedMessageType, routingKey)
	c, err := NewConsumer(ctx, queueName, exchange, conn)
	checkErr(err)
	c.Consume() // 消費消息
	defer c.Close()

	fmt.Println("exit press CTRL+C")
	interrupt := make(chan os.Signal, 1)
	signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	<-interrupt
	fmt.Println("exit consume messages")
}

// Exchange 交換機
type Exchange struct {
	Name                string // exchange名稱
	Type                string // exchange類型,支持direct、topic、fanout、headers、x-delayed-message
	RoutingKey          string // 路由key
	XDelayedMessageType string // 延時消息類型,支持direct、topic、fanout、headers
}

// NewDelayedMessageExchange 實例化一個delayed-message類型交換機,參數(shù)delayedMessageType 消息類型direct、topic、fanout、headers
func NewDelayedMessageExchange(exchangeName string, delayedMessageType string, routingKey string) *Exchange {
	return &Exchange{
		Name:                exchangeName,
		Type:                "x-delayed-message",
		RoutingKey:          routingKey,
		XDelayedMessageType: delayedMessageType,
	}
}

// Consumer 消費者
type Consumer struct {
	ctx       context.Context
	queueName string
	conn      *amqp.Connection
	ch        *amqp.Channel
	delivery  <-chan amqp.Delivery
	exchange  *Exchange
}

// NewConsumer 實例化一個消費者
func NewConsumer(ctx context.Context, queueName string, exchange *Exchange, conn *amqp.Connection) (*Consumer, error) {
	// 創(chuàng)建管道
	ch, err := conn.Channel()
	if err != nil {
		return nil, err
	}

	// 聲明交換機類型
	err = ch.ExchangeDeclare(
		exchange.Name, // 交換機名稱
		exchange.Type, // 交換機的類型,支持direct、topic、fanout、headers
		true,          // 是否持久化
		false,         // 是否自動刪除
		false,         // 是否公開,false即公開
		false,         // 是否等待
		amqp.Table{
			"x-delayed-type": exchange.XDelayedMessageType, // 延時消息的類型direct、topic、fanout、headers
		},
	)
	if err != nil {
		_ = ch.Close()
		return nil, err
	}

	// 聲明隊列,如果隊列不存在則自動創(chuàng)建,存在則跳過創(chuàng)建
	q, err := ch.QueueDeclare(
		queueName, // 消息隊列名稱
		true,      // 是否持久化
		false,     // 是否自動刪除
		false,     // 是否具有排他性(僅創(chuàng)建它的程序才可用)
		false,     // 是否阻塞處理
		nil,       // 額外的屬性
	)
	if err != nil {
		_ = ch.Close()
		return nil, err
	}

	// 綁定隊列和交換機
	err = ch.QueueBind(
		q.Name,
		exchange.RoutingKey,
		exchange.Name,
		false,
		nil,
	)
	if err != nil {
		_ = ch.Close()
		return nil, err
	}

	// 為消息隊列注冊消費者
	delivery, err := ch.ConsumeWithContext(
		ctx,
		queueName, // queue 名稱
		"",        // consumer 用來區(qū)分多個消費者
		true,      // auto-ack 是否自動應(yīng)答
		false,     // exclusive 是否獨有
		false,     // no-local 如果設(shè)置為true,表示不能將同一個Connection中生產(chǎn)者發(fā)送的消息傳遞給這個Connection中的消費者
		false,     // no-wait 是否阻塞
		nil,       // args
	)
	if err != nil {
		_ = ch.Close()
		return nil, err
	}

	return &Consumer{
		queueName: queueName,
		conn:      conn,
		ch:        ch,
		delivery:  delivery,
		exchange:  exchange,
	}, nil
}

// Consume 接收消息
func (c *Consumer) Consume() {
	go func() {
		fmt.Printf("waiting for messages, type=%s, queue=%s, key=%s\n", c.exchange.Type, c.queueName, c.exchange.RoutingKey)
		for d := range c.delivery {
			// 處理消息
			fmt.Printf("%s %s [received]: %s\n", time.Now().Format("2006-01-02 15:04:05.000"), c.exchange.RoutingKey, d.Body)
			// _ = d.Ack(false) // 如果auto-ack為false時,需要手動ack
		}
	}()
}

// Close 關(guān)閉
func (c *Consumer) Close() {
	if c.ch != nil {
		_ = c.ch.Close()
	}
}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

總結(jié)

上面介紹了rabbitMQ延時消息隊列簡單使用示例,在實際使用中,連接rabbitMQ應(yīng)該有網(wǎng)絡(luò)斷開重連功能。

rabbitMQ需要依賴插件rabbitmq_delayed_message_exchange,目前該插件的當前設(shè)計并不真正適合包含大量延遲消息(例如數(shù)十萬以上)的場景,另外該插件的一個可變性來源是依賴于 Erlang 計時器,在系統(tǒng)中使用了一定數(shù)量的長時間計時器之后,它們開始爭用調(diào)度程序資源,并且時間漂移不斷累積。

如果你采用了 Delayed Message 插件這種方式來實現(xiàn),對于消息可靠性要求非常高,在發(fā)送消息之前可以先保存到 DB 打標記,消費之后將消息標記為已消費,中間可以加入定時任務(wù)做檢測,這可以進一步保證你的消息的可靠性。

這是在github.com/rabbitmq/amqp091-go基礎(chǔ)上封裝的 rabbitmq 庫,開箱即用各種消息類型(directtopicfanoutheadersdelayed messagepublisher subscriber)。

到此這篇關(guān)于RabbitMQ延時消息隊列在golang中的使用詳解的文章就介紹到這了,更多相關(guān)go RabbitMQ延時隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Go Excelize API源碼閱讀GetPageLayout及SetPageMargins

    Go Excelize API源碼閱讀GetPageLayout及SetPageMargins

    這篇文章主要為大家介紹了Go Excelize API源碼閱讀GetPageLayout及SetPageMargins的方法示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-08-08
  • 關(guān)于go語言編碼需要放到src 文件夾下的問題

    關(guān)于go語言編碼需要放到src 文件夾下的問題

    這篇文章主要介紹了go語言編碼需要放到src 文件夾下的相關(guān)知識,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-10-10
  • Go空結(jié)構(gòu)體struct{}的作用是什么

    Go空結(jié)構(gòu)體struct{}的作用是什么

    本文主要介紹了Go空結(jié)構(gòu)體struct{}的作用是什么,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-02-02
  • Go 語言 IDE 中的 VSCode 配置使用教程

    Go 語言 IDE 中的 VSCode 配置使用教程

    Gogland 是 JetBrains 公司推出的Go語言集成開發(fā)環(huán)境。這篇文章主要介紹了Go 語言 IDE 中的 VSCode 配置使用教程,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-05-05
  • Go+Redis實現(xiàn)常見限流算法的示例代碼

    Go+Redis實現(xiàn)常見限流算法的示例代碼

    限流是項目中經(jīng)常需要使用到的一種工具,一般用于限制用戶的請求的頻率,也可以避免瞬間流量過大導致系統(tǒng)崩潰,或者穩(wěn)定消息處理速率。這篇文章主要是使用Go+Redis實現(xiàn)常見的限流算法,需要的可以參考一下
    2023-04-04
  • 深入解析Go語言編程中的遞歸使用

    深入解析Go語言編程中的遞歸使用

    這篇文章主要介紹了Go語言編程中的遞歸使用,是Go語言入門學習中的基礎(chǔ)知識,需要的朋友可以參考下
    2015-11-11
  • 使用Golang編寫一個簡單的命令行工具

    使用Golang編寫一個簡單的命令行工具

    Cobra是一個強大的開源工具,能夠幫助我們快速構(gòu)建出優(yōu)雅且功能豐富的命令行應(yīng)用,本文將利用Cobra編寫一個簡單的命令行工具,感興趣的可以了解下
    2023-12-12
  • Go語言實現(xiàn)互斥鎖、隨機數(shù)、time、List

    Go語言實現(xiàn)互斥鎖、隨機數(shù)、time、List

    這篇文章主要介紹了Go語言實現(xiàn)互斥鎖、隨機數(shù)、time、List的相關(guān)資料,需要的朋友可以參考下
    2018-10-10
  • Go-家庭收支記賬軟件項目實現(xiàn)

    Go-家庭收支記賬軟件項目實現(xiàn)

    這篇文章主要介紹了Go-家庭收支記賬軟件項目實現(xiàn),本文章內(nèi)容詳細,具有很好的參考價值,希望對大家有所幫助,需要的朋友可以參考下
    2023-01-01
  • golang?gorm錯誤處理事務(wù)以及日志用法示例

    golang?gorm錯誤處理事務(wù)以及日志用法示例

    這篇文章主要為大家介紹了golang?gorm錯誤處理事務(wù)以及日志用法示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步早日升職加薪
    2022-04-04

最新評論