亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

SpringBoot 整合 RabbitMQ 的使用方式(代碼示例)

 更新時(shí)間:2024年10月11日 09:39:36   作者:寰夢(mèng)  
本文詳細(xì)介紹了使用RabbitTemplate進(jìn)行消息傳遞的幾種模式,包括點(diǎn)對(duì)點(diǎn)通信、發(fā)布/訂閱模式、工作隊(duì)列模式、路由模式和主題模式,每種模式都通過代碼示例展示了生產(chǎn)者和消費(fèi)者的實(shí)現(xiàn),幫助開發(fā)者理解和運(yùn)用RabbitMQ進(jìn)行高效的消息處理

一、RabbitTemplate 的使用

1.【導(dǎo)入依賴】

<!-- rabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.6.1</version>
</dependency>

2.【添加配置】

rabbitmq:
    host:   #ip地址
    port: 5672 #端口
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        prefetch: 1 # 默認(rèn)每次取出一條消息消費(fèi), 消費(fèi)完成取下一條
        acknowledge-mode: manual # 設(shè)置消費(fèi)端手動(dòng)ack確認(rèn)
        retry:
          enabled: true # 是否支持重試
    publisher-confirm-type: correlated  #確認(rèn)消息已發(fā)送到交換機(jī)(Exchange)
    publisher-returns: true  #確認(rèn)消息已發(fā)送到隊(duì)列(Queue)

3.【點(diǎn)對(duì)點(diǎn)通信(隊(duì)列模式)(Point-to-Point Messaging)】

使用方式:

這種方式也被稱為隊(duì)列(Queue)模型。消息發(fā)送者(Producer)發(fā)送消息到隊(duì)列,然后消息接收者(Consumer)從隊(duì)列中獲取消息進(jìn)行處理。這種模型下,每個(gè)消息只有一個(gè)消費(fèi)者可以接收,確保消息的可靠傳遞和順序處理。

代碼示例: 生產(chǎn)者

    /**
     * 第一種模型: 簡(jiǎn)單模型
     * 一個(gè)消息生產(chǎn)者  一個(gè)隊(duì)列  一個(gè)消費(fèi)者
     * @return
     */
    @GetMapping("hello/world")
    public void helloWorld() {
        SysUser sysUser = new SysUser();
        // 發(fā)送消息
        // 第一個(gè)參數(shù): String routingKey 路由規(guī)則 【交換機(jī) 和隊(duì)列的綁定規(guī)則 】  隊(duì)列名稱
        // 第二個(gè)參數(shù): object message 消息的內(nèi)容
//        rabbitTemplate.convertAndSend("hello_world_queue", "hello world rabbit!");
        ///  MessagePostProcessor 消息包裝器  如果需要對(duì)消息進(jìn)行包裝
        rabbitTemplate.convertAndSend("hello_world_queue", "hello world rabbit!", message -> {
            // 設(shè)置唯一的標(biāo)識(shí)
            message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
            return message;
        });

消費(fèi)者

import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class HelloWorldConsumer {
    @Autowired
    private StringRedisTemplate redisTemplate;
    /**
     * 監(jiān)聽 hello_world_queue 隊(duì)列消費(fèi)消息
     * queues 監(jiān)聽隊(duì)列的名稱  要求這個(gè)隊(duì)列必須是已經(jīng)存在的隊(duì)列
     * queuesToDeclare 監(jiān)聽隊(duì)列 如果這個(gè)隊(duì)列不存在 則 rabbitMQ 中 RabbitAdmin 會(huì)幫助去構(gòu)建這個(gè)隊(duì)列
     */
    @RabbitListener(queuesToDeclare = @Queue("hello_world_queue"))
    public void helloWorldConsumer(String msg, Message message, Channel channel) {
        // 獲取消息的唯一標(biāo)識(shí)
        String messageId = message.getMessageProperties().getMessageId();
        // 將消息添加到 Redis的set集合中  set 不能重復(fù)的  方法的返回值 添加成功的數(shù)量
        Long count = redisTemplate.opsForSet().add("hello_world_queue", messageId);
        if (count != null && count == 1) {
            // 沒有消費(fèi)過   正常消費(fèi)
            log.info("hello_world_queue隊(duì)列消費(fèi)者接收到了消息,消息內(nèi)容:{}", message);
        }
    }
}

4.【發(fā)布/訂閱模式(Publish/Subscribe Messaging)】

使用方式:

在發(fā)布/訂閱模式中,消息發(fā)送者將消息發(fā)布到交換機(jī)(Exchange),而不是直接發(fā)送到隊(duì)列。交換機(jī)負(fù)責(zé)將消息路由到一個(gè)或多個(gè)綁定的隊(duì)列中。每個(gè)訂閱者(Subscriber)可以選擇訂閱它感興趣的消息隊(duì)列,從而接收消息。

代碼示例: 生產(chǎn)者

/**
 * 工作隊(duì)列
 * 一個(gè)生產(chǎn)者  一個(gè)隊(duì)列  多個(gè)消費(fèi)者
 */
@GetMapping("work/queue")
public void workQueue() {
    for (int i = 1; i <= 10; i++) {
        rabbitTemplate.convertAndSend("work_queue", i + "hello work queue!");
    }
}

消費(fèi)者

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class WorkQueueConsumer {
    /***
     * 消費(fèi)者1
     * @param message
     */
    @RabbitListener(queuesToDeclare = @Queue("work_queue"))
    public void workQueueConsumer(String message) throws InterruptedException {
        Thread.sleep(200);
        log.info("work_queue隊(duì)列消費(fèi)者1接收到了消息,消息內(nèi)容:{}", message);
    }
    /***
     * 消費(fèi)者2
     * @param message
     */
    @RabbitListener(queuesToDeclare = @Queue("work_queue"))
    public void workQueueConsumer2(String message) throws InterruptedException {
        Thread.sleep(400);
        log.info("work_queue隊(duì)列消費(fèi)者2接收到了消息,消息內(nèi)容:{}", message);
    }
}

5.【工作隊(duì)列模式(Work Queues)】

使用方式:

工作隊(duì)列模式也稱為任務(wù)隊(duì)列(Task Queues),它可以用來實(shí)現(xiàn)任務(wù)的異步處理。多個(gè)工作者(Worker)同時(shí)監(jiān)聽同一個(gè)隊(duì)列,當(dāng)有新的任務(wù)消息被發(fā)送到隊(duì)列中時(shí),空閑的工作者會(huì)獲取并處理這些任務(wù),確保任務(wù)能夠并行處理而不會(huì)重復(fù)執(zhí)行。

代碼示例: 生產(chǎn)者

/**
 * 發(fā)布訂閱
 * 一個(gè)生產(chǎn)者  多個(gè)隊(duì)列   多個(gè)消費(fèi)者   涉及 到交換機(jī)  fanout
 */
@GetMapping("publish/subscribe")
public void publishSubscribe() {
    // 第一個(gè)參數(shù): 交換機(jī)的名稱  沒有要求
    // 第二個(gè)參數(shù): 交換機(jī)和隊(duì)列的綁定規(guī)則    如果是發(fā)布訂閱模式 那么這個(gè)規(guī)則默認(rèn)不寫 只需要交換機(jī)和隊(duì)列綁定即可不需要規(guī)則
    // 第三個(gè)參數(shù): 消息內(nèi)容
    rabbitTemplate.convertAndSend("publish_subscribe_exchange", "",
            "hello publisher subscribe!!");
}

消費(fèi)者

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class PublisherSubscribeConsumer {
    private static final Logger log = LoggerFactory.getLogger(PublisherSubscribeConsumer.class);
    /**
     * 發(fā)布訂閱模型消費(fèi)者
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("pb_sb_queue_01"),
            exchange = @Exchange(name = "publish_subscribe_exchange",
                    type = ExchangeTypes.FANOUT)))
    public void publisherSubscribe(String message) {
        log.info("發(fā)布訂閱模型消費(fèi)者1接收到了消息,消息內(nèi)容:{}", message);
    }
    /**
     * 發(fā)布訂閱模型消費(fèi)者
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("pb_sb_queue_02"),
            exchange = @Exchange(name = "publish_subscribe_exchange", type = ExchangeTypes.FANOUT)))
    public void publisherSubscribe2(String message) {
        log.info("發(fā)布訂閱模型消費(fèi)者2接收到了消息,消息內(nèi)容:{}", message);
    }
}

6.【路由模式(Routing)】

使用方式:

路由模式允許發(fā)送者根據(jù)消息的路由鍵(Routing Key)將消息路由到特定的隊(duì)列。發(fā)送者將消息發(fā)送到交換機(jī),并且通過設(shè)置不同的路由鍵,使消息能夠被交換機(jī)路由到不同的隊(duì)列。消費(fèi)者可以根據(jù)需要選擇監(jiān)聽哪些隊(duì)列來接收消息。

代碼示例: 生產(chǎn)者

/**
 * 路由模型
 * 一個(gè)生產(chǎn)者  多個(gè)隊(duì)列   多個(gè)消費(fèi)者   涉及 到交換機(jī)  direct
 */
@GetMapping("routing")
public void routing() {
    // 第一個(gè)參數(shù): 交換機(jī)的名稱  沒有要求
    // 第二個(gè)參數(shù): 交換機(jī)和隊(duì)列的綁定規(guī)則    字符串 隨意
    // 第三個(gè)參數(shù): 消息內(nèi)容
    rabbitTemplate.convertAndSend("routing_exchange", "aaa",
            "hello routing!!");
}

消費(fèi)者

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class RoutingConsumer {
    /**
     * 路由模型消費(fèi)者
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_01"),
            exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT),
            key = { "abc", "error", "info" }))
    public void routingConsumer(String message) {
        log.info("路由模型消費(fèi)者1接收到了消息,消息內(nèi)容:{}", message);
    }
    /**
     * 路由模型消費(fèi)者
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_02"),
            exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT),
            key = { "aaa", "ccc", "waadaffas" }))
    public void routingConsumer2(String message) {
        log.info("路由模型消費(fèi)者2接收到了消息,消息內(nèi)容:{}", message);
    }
    /**
     * 路由模型消費(fèi)者
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_03"),
            exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT),
            key = { "bbbb", "asdfasd", "asdfasdf" }))
    public void routingConsumer3(String message) {
        log.info("路由模型消費(fèi)者3接收到了消息,消息內(nèi)容:{}", message);
    }
}

7.【主題模式(Topics)】

使用方式:

主題模式是路由模式的一種擴(kuò)展,它允許發(fā)送者根據(jù)消息的多個(gè)屬性(如主題)將消息路由到一個(gè)或多個(gè)隊(duì)列。主題交換機(jī)(Topic Exchange)使用通配符匹配路由鍵與隊(duì)列綁定鍵的模式,從而實(shí)現(xiàn)更靈活的消息路由和過濾。

代碼示例: 生產(chǎn)者

/**
 * 主題模型
 * 一個(gè)生產(chǎn)者  多個(gè)隊(duì)列   多個(gè)消費(fèi)者   涉及 到交換機(jī)  topic
 */
@GetMapping("topic")
public void topic() {
    // 第一個(gè)參數(shù): 交換機(jī)的名稱  沒有要求
    // 第二個(gè)參數(shù): 交換機(jī)和隊(duì)列的綁定規(guī)則    多個(gè)單詞  以 “.” 拼起來
    // 第三個(gè)參數(shù): 消息內(nèi)容
    rabbitTemplate.convertAndSend("topic_exchange", "bwie.age.name",
            "hello topic!!");
}

消費(fèi)者

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class TopicConsumer {
    /**
     * *  表示任意一個(gè)單詞
     * #  表示任意一個(gè)單詞 或 多個(gè)
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_01"),
            exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC),
            key = { "abc.*", "error.*.info", "#.name" }))
    public void topicConsumer(String message) {
        log.info("xxxxxxxxx1");
    }
    /**
     * *  表示任意一個(gè)單詞
     * #  表示任意一個(gè)單詞 或 多個(gè)
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_02"),
            exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC),
            key = { "abc.*", "username" }))
    public void topicConsumer2(String message) {
        log.info("xxxxxxxxx2");
    }
    /**
     * *  表示任意一個(gè)單詞
     * #  表示任意一個(gè)單詞 或 多個(gè)
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_03"),
            exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC),
            key = { "bwie.*", "error.*.info" }))
    public void topicConsumer3(String message) {
        log.info("xxxxxxxxx3");
    }
}

到此這篇關(guān)于SpringBoot 整合 RabbitMQ 的使用的文章就介紹到這了,更多相關(guān)SpringBoot 整合 RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java 給圖片和動(dòng)圖添加水印的方法

    Java 給圖片和動(dòng)圖添加水印的方法

    本篇文章主要介紹了Java 給圖片和動(dòng)圖添加水印的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2018-04-04
  • SpringBoot整合Thymeleaf的方法

    SpringBoot整合Thymeleaf的方法

    這篇文章主要介紹了SpringBoot整合Thymeleaf的方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下,希望能夠幫助到你
    2021-07-07
  • Hibernate的各種保存方式的區(qū)別詳解

    Hibernate的各種保存方式的區(qū)別詳解

    今天小編就為大家分享一篇關(guān)于Hibernate的各種保存方式的區(qū)別詳解,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧
    2019-03-03
  • SpringBoot+Shiro+Redis+Mybatis-plus 實(shí)戰(zhàn)項(xiàng)目及問題小結(jié)

    SpringBoot+Shiro+Redis+Mybatis-plus 實(shí)戰(zhàn)項(xiàng)目及問題小結(jié)

    最近也是一直在保持學(xué)習(xí)課外拓展技術(shù),所以想自己做一個(gè)簡(jiǎn)單小項(xiàng)目,于是就有了這個(gè)快速上手 Shiro 和 Redis 的小項(xiàng)目,說白了就是拿來練手調(diào)調(diào) API,然后做完后拿來總結(jié)的小項(xiàng)目,感興趣的朋友一起看看吧
    2021-04-04
  • Hibernate實(shí)現(xiàn)悲觀鎖和樂觀鎖代碼介紹

    Hibernate實(shí)現(xiàn)悲觀鎖和樂觀鎖代碼介紹

    這篇文章主要介紹了Hibernate實(shí)現(xiàn)悲觀鎖和樂觀鎖的有關(guān)內(nèi)容,涉及hibernate的隔離機(jī)制,以及實(shí)現(xiàn)悲觀鎖和樂觀鎖的代碼實(shí)現(xiàn),需要的朋友可以了解下。
    2017-09-09
  • Java數(shù)據(jù)結(jié)構(gòu)之鏈表詳解

    Java數(shù)據(jù)結(jié)構(gòu)之鏈表詳解

    本篇文章我們將講解一種新型的數(shù)據(jù)結(jié)構(gòu)—鏈表,鏈表是一種使用廣泛的通用數(shù)據(jù)結(jié)構(gòu),它可以用來作為實(shí)現(xiàn)棧,隊(duì)列等數(shù)據(jù)結(jié)構(gòu)的基礎(chǔ).文中有非常詳細(xì)的介紹,需要的朋友可以參考下
    2021-05-05
  • SpringBoot接口返回結(jié)果封裝方法實(shí)例詳解

    SpringBoot接口返回結(jié)果封裝方法實(shí)例詳解

    在實(shí)際項(xiàng)目中,一般會(huì)把結(jié)果放在一個(gè)封裝類中,封裝類中包含http狀態(tài)值,狀態(tài)消息,以及實(shí)際的數(shù)據(jù)。這里主要記錄兩種方式,通過實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友參考下吧
    2021-09-09
  • Spring ApplicationContext接口功能詳細(xì)介紹

    Spring ApplicationContext接口功能詳細(xì)介紹

    ApplicationContext是Spring應(yīng)用程序中的中央接口,由于繼承了多個(gè)組件,使得ApplicationContext擁有了許多Spring的核心功能,如獲取bean組件,注冊(cè)監(jiān)聽事件,加載資源文件等
    2023-02-02
  • 解決java.lang.Error: Unresolved compilation problems:問題

    解決java.lang.Error: Unresolved compilation pro

    這篇文章主要介紹了解決java.lang.Error: Unresolved compilation problems:問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • Java基礎(chǔ)知識(shí)匯總

    Java基礎(chǔ)知識(shí)匯總

    這篇文章對(duì)Java編程語言的基礎(chǔ)知識(shí)作了一個(gè)較為全面的匯總,在這里給大家分享一下。需要的朋友可以參考。
    2017-09-09

最新評(píng)論