亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

SpringBoot集成RabbitMQ的方法(死信隊(duì)列)

 更新時(shí)間:2019年05月01日 11:21:27   作者:小揪揪  
這篇文章主要介紹了SpringBoot集成RabbitMQ的方法(死信隊(duì)列),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

介紹

死信隊(duì)列:沒有被及時(shí)消費(fèi)的消息存放的隊(duì)列,消息沒有被及時(shí)消費(fèi)有以下幾點(diǎn)原因:
1.有消息被拒絕(basic.reject/ basic.nack)并且requeue=false
2.隊(duì)列達(dá)到最大長度
3.消息TTL過期

場景

1.小時(shí)進(jìn)入初始隊(duì)列,等待30分鐘后進(jìn)入5分鐘隊(duì)列
2.消息等待5分鐘后進(jìn)入執(zhí)行隊(duì)列
3.執(zhí)行失敗后重新回到5分鐘隊(duì)列
4.失敗5次后,消息進(jìn)入2小時(shí)隊(duì)列
5.消息等待2小時(shí)進(jìn)入執(zhí)行隊(duì)列
6.失敗5次后,將消息丟棄或做其他處理

使用

安裝MQ

使用docker方式安裝,選擇帶mangement的版本

docker pull rabbitmq:management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

訪問 localhost: 15672,默認(rèn)賬號密碼guest/guest

項(xiàng)目配置

(1)創(chuàng)建springboot項(xiàng)目
(2)在application.properties配置文件中配置mq連接信息

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

(3)隊(duì)列配置

package com.df.ps.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MqConfig {

  //time
  @Value("${spring.df.buffered.min:120}")
  private int springdfBufferedTime;

  @Value("${spring.df.high-buffered.min:5}")
  private int springdfHighBufferedTime;

  @Value("${spring.df.low-buffered.min:120}")
  private int springdfLowBufferedTime;

  // 30min Buffered Queue
  @Value("${spring.df.queue:spring-df-buffered-queue}")
  private String springdfBufferedQueue;

  @Value("${spring.df.topic:spring-df-buffered-topic}")
  private String springdfBufferedTopic;

  @Value("${spring.df.route:spring-df-buffered-route}")
  private String springdfBufferedRouteKey;

  // 5M Buffered Queue
  @Value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}")
  private String springdfHighBufferedQueue;

  @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
  private String springdfHighBufferedTopic;

  @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
  private String springdfHighBufferedRouteKey;

  // High Queue
  @Value("${spring.df.high.queue:spring-df-high-queue}")
  private String springdfHighQueue;

  @Value("${spring.df.high.topic:spring-df-high-topic}")
  private String springdfHighTopic;

  @Value("${spring.df.high.route:spring-df-high-route}")
  private String springdfHighRouteKey;

  // 2H Low Buffered Queue
  @Value("${spring.df.low-buffered.queue:spring-df-low-buffered-queue}")
  private String springdfLowBufferedQueue;

  @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
  private String springdfLowBufferedTopic;

  @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
  private String springdfLowBufferedRouteKey;

  // Low Queue
  @Value("${spring.df.low.queue:spring-df-low-queue}")
  private String springdfLowQueue;

  @Value("${spring.df.low.topic:spring-df-low-topic}")
  private String springdfLowTopic;

  @Value("${spring.df.low.route:spring-df-low-route}")
  private String springdfLowRouteKey;


  @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedQueue")
  Queue springdfBufferedQueue() {
    int bufferedTime = 1000 * 60 * springdfBufferedTime;
    return createBufferedQueue(springdfBufferedQueue, springdfHighBufferedTopic, springdfHighBufferedRouteKey, bufferedTime);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedQueue")
  Queue springdfHighBufferedQueue() {
    int highBufferedTime = 1000 * 60 * springdfHighBufferedTime;
    return createBufferedQueue(springdfHighBufferedQueue, springdfHighTopic, springdfHighRouteKey, highBufferedTime);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighQueue")
  Queue springdfHighQueue() {
    return new Queue(springdfHighQueue, true);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedQueue")
  Queue springdfLowBufferedQueue() {
    int lowBufferedTime = 1000 * 60 * springdfLowBufferedTime;
    return createBufferedQueue(springdfLowBufferedQueue, springdfLowTopic, springdfLowRouteKey, lowBufferedTime);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowQueue")
  Queue springdfLowQueue() {
    return new Queue(springdfLowQueue, true);
  }


  @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedTopic")
  TopicExchange springdfBufferedTopic() {
    return new TopicExchange(springdfBufferedTopic);
  }

  @Bean
  Binding springBuffereddf(Queue springdfBufferedQueue, TopicExchange springdfBufferedTopic) {
    return BindingBuilder.bind(springdfBufferedQueue).to(springdfBufferedTopic).with(springdfBufferedRouteKey);
  }


  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedTopic")
  TopicExchange springdfHighBufferedTopic() {
    return new TopicExchange(springdfHighBufferedTopic);
  }

  @Bean
  Binding springHighBuffereddf(Queue springdfHighBufferedQueue, TopicExchange springdfHighBufferedTopic) {
    return BindingBuilder.bind(springdfHighBufferedQueue).to(springdfHighBufferedTopic).with(springdfHighBufferedRouteKey);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighTopic")
  TopicExchange springdfHighTopic() {
    return new TopicExchange(springdfHighTopic);
  }

  @Bean
  Binding springHighdf(Queue springdfHighQueue, TopicExchange springdfHighTopic) {
    return BindingBuilder.bind(springdfHighQueue).to(springdfHighTopic).with(springdfHighRouteKey);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedTopic")
  TopicExchange springdfLowBufferedTopic() {
    return new TopicExchange(springdfLowBufferedTopic);
  }

  @Bean
  Binding springLowBuffereddf(Queue springdfLowBufferedQueue, TopicExchange springdfLowBufferedTopic) {
    return BindingBuilder.bind(springdfLowBufferedQueue).to(springdfLowBufferedTopic).with(springdfLowBufferedRouteKey);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowTopic")
  TopicExchange springdfLowTopic() {
    return new TopicExchange(springdfLowTopic);
  }

  @Bean
  Binding springLowdf(Queue springdfLowQueue, TopicExchange springdfLowTopic) {
    return BindingBuilder.bind(springdfLowQueue).to(springdfLowTopic).with(springdfLowRouteKey);
  }


  @Bean
  SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                       MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(springdfHighQueue, springdfLowQueue);
    container.setMessageListener(listenerAdapter);
    return container;
  }

  @Bean
  MessageListenerAdapter listenerAdapter(IntegrationReceiver receiver) {


    MessageListenerAdapter adapter = new MessageListenerAdapter(receiver);
    adapter.setDefaultListenerMethod("receive");
    Map<String, String> queueOrTagToMethodName = new HashMap<>();
    queueOrTagToMethodName.put(springdfHighQueue, "springdfHighReceive");
    queueOrTagToMethodName.put(springdfLowQueue, "springdfLowReceive");
    adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
    return adapter;

  }


  private Queue createBufferedQueue(String queueName, String topic, String routeKey, int bufferedTime) {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", topic);
    args.put("x-dead-letter-routing-key", routeKey);
    args.put("x-message-ttl", bufferedTime);
    // 是否持久化
    boolean durable = true;
    // 僅創(chuàng)建者可以使用的私有隊(duì)列,斷開后自動(dòng)刪除
    boolean exclusive = false;
    // 當(dāng)所有消費(fèi)客戶端連接斷開后,是否自動(dòng)刪除隊(duì)列
    boolean autoDelete = false;

    return new Queue(queueName, durable, exclusive, autoDelete, args);
  }
}

消費(fèi)者配置

package com.df.ps.mq;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import java.util.Map;

public class MqReceiver {

  private static Logger logger = LoggerFactory.getLogger(MqReceiver.class);

  @Value("${high-retry:5}")
  private int highRetry;

  @Value("${low-retry:5}")
  private int lowRetry;

  @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
  private String springdfHighBufferedTopic;

  @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
  private String springdfHighBufferedRouteKey;

  @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
  private String springdfLowBufferedTopic;

  @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
  private String springdfLowBufferedRouteKey;

  private final RabbitTemplate rabbitTemplate;
  @Autowired
  public MqReceiver(RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;
  }

  public void receive(Object message) {
    if (logger.isInfoEnabled()) {
      logger.info("default receiver: " + message);
    }
  }

  /**
   * 消息從初始隊(duì)列進(jìn)入5分鐘的高速緩沖隊(duì)列
   * @param message
   */
  public void highReceiver(Object message){
    ObjectMapper mapper = new ObjectMapper();
    Map msg = mapper.convertValue(message, Map.class);

    try{
      logger.info("這里做消息處理...");
    }catch (Exception e){
      int times = msg.get("times") == null ? 0 : (int) msg.get("times");
      if (times < highRetry) {
        msg.put("times", times + 1);
        rabbitTemplate.convertAndSend(springdfHighBufferedTopic,springdfHighBufferedRouteKey,message);
      } else {
        msg.put("times", 0);
        rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);
      }
    }
  }

  /**
   * 消息從5分鐘緩沖隊(duì)列進(jìn)入2小時(shí)緩沖隊(duì)列
   * @param message
   */
  public void lowReceiver(Object message){
    ObjectMapper mapper = new ObjectMapper();
    Map msg = mapper.convertValue(message, Map.class);
    
    try {
      logger.info("這里做消息處理...");
    }catch (Exception e){
      int times = msg.get("times") == null ? 0 : (int) msg.get("times");
      if (times < lowRetry) {
        rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);
      }else{
        logger.info("消息無法被消費(fèi)...");
      }
    } 
  }
}

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • pagehelper插件顯示total為-1或1的問題

    pagehelper插件顯示total為-1或1的問題

    這篇文章主要介紹了pagehelper插件顯示total為-1或1,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-09-09
  • jvm支持最大線程數(shù)簡單測試

    jvm支持最大線程數(shù)簡單測試

    這篇文章主要介紹了jvm支持最大線程數(shù)簡單測試,具有一定參考價(jià)值,需要的朋友可以了解下。
    2017-11-11
  • Struts2配置文件中使用通配符的方法(三種形式)

    Struts2配置文件中使用通配符的方法(三種形式)

    Struts2是一個(gè)基于MVC設(shè)計(jì)模式的Web應(yīng)用框架,它本質(zhì)上相當(dāng)于一個(gè)servlet,在MVC設(shè)計(jì)模式中,Struts2作為控制器(Controller)來建立模型與視圖的數(shù)據(jù)交互。這篇文章主要介紹了Struts2配置文件中使用通配符的相關(guān)知識(shí),需要的朋友可以參考下
    2019-11-11
  • 學(xué)習(xí)Java之Java中的異常處理機(jī)制詳解

    學(xué)習(xí)Java之Java中的異常處理機(jī)制詳解

    在本文中,小編將帶領(lǐng)大家來學(xué)習(xí)Java的異常處理機(jī)制,包括異常機(jī)制、異常類型、如何捕獲異常、如何拋出異常以及如何創(chuàng)建自定義異常等核心內(nèi)容,感興趣的同學(xué)跟著小編一起來看看吧
    2023-08-08
  • java中final與finally的使用介紹

    java中final與finally的使用介紹

    本篇文章介紹了,在java中final與finally的使用。需要的朋友參考下
    2013-04-04
  • Java服務(wù)中的大文件上傳和下載優(yōu)化技巧分享

    Java服務(wù)中的大文件上傳和下載優(yōu)化技巧分享

    在Java服務(wù)中處理大文件的上傳和下載是一項(xiàng)常見但復(fù)雜的任務(wù),為了提供優(yōu)秀的用戶體驗(yàn)和高效的系統(tǒng)性能,我們將探索多種策略和技術(shù),并在每一點(diǎn)上都提供代碼示例以便實(shí)戰(zhàn)應(yīng)用,需要的朋友可以參考下
    2023-10-10
  • Java 如何通過Magic 魔數(shù)獲取文件類型

    Java 如何通過Magic 魔數(shù)獲取文件類型

    魔數(shù)有很多種定義,這里我們討論的主要是在編程領(lǐng)域的定義,文件的起始幾個(gè)字節(jié)的內(nèi)容是固定的,本文給大家介紹Java Magic 魔數(shù)獲取文件類型的相關(guān)知識(shí),感興趣的朋友一起看看吧
    2023-11-11
  • Java中用于SMB/CIFS網(wǎng)絡(luò)的JCIFS庫的用法詳解

    Java中用于SMB/CIFS網(wǎng)絡(luò)的JCIFS庫的用法詳解

    JCIFS是一個(gè)強(qiáng)大的庫,允許Java應(yīng)用程序無縫地與SMB/CIFS資源進(jìn)行交互,本文將探討JCIFS的概念和工作原理以及如何在?Java?應(yīng)用程序中有效使用它,希望對大家有所幫助
    2024-12-12
  • Java最全文件操作實(shí)例匯總

    Java最全文件操作實(shí)例匯總

    這篇文章主要介紹了Java最全文件操作,總結(jié)分析了大量實(shí)例,詳細(xì)匯總了Java針對文件的各種常用操作,需要的朋友可以參考下
    2015-11-11
  • Mybatis基于注解與XML開發(fā)使用流程

    Mybatis基于注解與XML開發(fā)使用流程

    MyBatis是Java的持久化框架,目的是為了使操作數(shù)據(jù)庫更加方便、靈活、高效,可以通過Java注解和XML文件來映射Java對象和SQL語句,提供了非常靈活的SQL編寫方式和動(dòng)態(tài)SQL語句的創(chuàng)建方式,這篇文章主要介紹了Mybatis基于注解與XML開發(fā),需要的朋友可以參考下
    2023-07-07

最新評論