亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

springboot實(shí)現(xiàn)rabbitmq消息確認(rèn)的示例代碼

 更新時(shí)間:2023年09月11日 09:43:42   作者:西安未央  
RabbitMQ的消息確認(rèn)有兩種, 一種是消息發(fā)送確認(rèn),第二種是消費(fèi)接收確認(rèn),本文主要介紹了springboot實(shí)現(xiàn)rabbitmq消息確認(rèn)的示例代碼,具有一定的參考價(jià)值,感興趣的可以了解一下

概述

RabbitMQ的消息確認(rèn)有兩種。 一種是消息發(fā)送確認(rèn)。這種是用來確認(rèn)生產(chǎn)者將消息發(fā)送給交換器,交換器傳遞給隊(duì)列的過程中,消息是否成功投遞。發(fā)送確認(rèn)分為兩步,一是確認(rèn)是否到達(dá)交換器,二是確認(rèn)是否到達(dá)隊(duì)列。 第二種是消費(fèi)接收確認(rèn)。這種是確認(rèn)消費(fèi)者是否成功消費(fèi)了隊(duì)列中的消息。

一、運(yùn)行效果

image.png

二、實(shí)現(xiàn)過程

①、引入rabbitmq包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

②、修改application.properties配置

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 發(fā)送者開啟 confirm 確認(rèn)機(jī)制
spring.rabbitmq.publisher-confirms=true
# 發(fā)送者開啟 return 確認(rèn)機(jī)制
spring.rabbitmq.publisher-returns=true
####################################################
# 設(shè)置消費(fèi)端手動(dòng) ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重試
spring.rabbitmq.listener.simple.retry.enabled=true

③、定義exchange和queue,并將queue綁定在exchange上

package com.mm.springbootrabbitmqconfirmdemo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
    @Bean(name = "confirmQueue")
    public Queue confirmQueue(){
        return  new Queue("confirmQueue",true,false,false);
    }
    @Bean(name = "confirmExchange")
    public FanoutExchange confirmExchange(){
        return new FanoutExchange("confirmExchange");
    }
    @Bean
    public Binding confirmFanoutExchangeAndQueue(@Qualifier("confirmExchange") FanoutExchange confirmExchange,
                                                 @Qualifier("confirmQueue") Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange);
    }
}

④、消息發(fā)送確認(rèn)

發(fā)送消息確認(rèn):用來確認(rèn)生產(chǎn)者 producer 將消息發(fā)送到 broker ,broker 上的交換機(jī) exchange 再投遞給隊(duì)列 queue的過程中,消息是否成功投遞。

消息從 producer 到 rabbitmq broker有一個(gè) confirmCallback 確認(rèn)模式。

消息從 exchange 到 queue 投遞失敗有一個(gè) returnCallback 退回模式。

我們可以利用這兩個(gè)Callback來確保消息的100%送達(dá)。

1、 ConfirmCallback確認(rèn)模式

消息只要被 rabbitmq broker 接收到就會(huì)觸發(fā) confirmCallback 回調(diào) 。

package com.mm.springbootrabbitmqconfirmdemo.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause){
        if (!ack) {
            log.error("消息發(fā)送異常!");
        } else {
            log.info("發(fā)送者爸爸已經(jīng)收到確認(rèn),correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }
    }
}

實(shí)現(xiàn)接口 ConfirmCallback ,重寫其confirm()方法,方法內(nèi)有三個(gè)參數(shù)correlationDataack、cause

  • correlationData:對(duì)象內(nèi)部只有一個(gè) id 屬性,用來表示當(dāng)前消息的唯一性。

  • ack:消息投遞到broker 的狀態(tài),true表示成功。

  • cause:表示投遞失敗的原因。

但消息被 broker 接收到只能表示已經(jīng)到達(dá) MQ服務(wù)器,并不能保證消息一定會(huì)被投遞到目標(biāo) queue 里。所以接下來需要用到 returnCallback 。

2、 ReturnCallback 退回模式

如果消息未能投遞到目標(biāo) queue 里將觸發(fā)回調(diào) returnCallback ,一旦向 queue 投遞消息未成功,這里一般會(huì)記錄下當(dāng)前消息的詳細(xì)投遞數(shù)據(jù),方便后續(xù)做重發(fā)或者補(bǔ)償?shù)炔僮鳌?/p>

com.mm.springbootrabbitmqconfirmdemo.service;
lombok.extern.slf4j.;
org.springframework.amqp.core.Message;
org.springframework.amqp.rabbit.core.RabbitTemplate;
org.springframework.stereotype.;
ReturnCallbackService  RabbitTemplate.ReturnCallback returnedMessageMessage message, replyCode, String replyText, String exchange, String routingKey.info, replyCode, replyText, exchange, routingKey;

實(shí)現(xiàn)接口ReturnCallback,重寫 returnedMessage() 方法,方法有五個(gè)參數(shù)message(消息體)、replyCode(響應(yīng)code)、replyText(響應(yīng)內(nèi)容)、exchange(交換機(jī))、routingKey(隊(duì)列)。

下邊是具體的消息發(fā)送,在rabbitTemplate中設(shè)置 Confirm 和 Return 回調(diào),我們通過setDeliveryMode()對(duì)消息做持久化處理,為了后續(xù)測(cè)試創(chuàng)建一個(gè) CorrelationData對(duì)象,添加一個(gè)id 為10000000000。

⑤、消息發(fā)送確認(rèn)

消息接收確認(rèn)要比消息發(fā)送確認(rèn)簡(jiǎn)單一點(diǎn),因?yàn)橹挥幸粋€(gè)消息回執(zhí)(ack)的過程。使用@RabbitHandler注解標(biāo)注的方法要增加 channel(信道)、message 兩個(gè)參數(shù)。

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
    @RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {
        try {
            log.info("小富收到消息:{}", msg);
            //TODO 具體業(yè)務(wù)
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }  catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重復(fù)處理失敗,拒絕再次接收...");
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息
            } else {
                log.error("消息即將再次返回隊(duì)列處理...");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); 
            }
        }
    }
}

消費(fèi)消息有三種回執(zhí)方法,我們來分析一下每種方法的含義。

1、basicAck

basicAck:表示成功確認(rèn),使用此回執(zhí)方法后,消息會(huì)被rabbitmq broker 刪除。

void?basicAck(long?deliveryTag,?boolean?multiple)

deliveryTag:表示消息投遞序號(hào),每次消費(fèi)消息或者消息重新投遞后,deliveryTag都會(huì)增加。手動(dòng)消息確認(rèn)模式下,我們可以對(duì)指定deliveryTag的消息進(jìn)行ack、nack、reject等操作。

multiple:是否批量確認(rèn),值為 true 則會(huì)一次性 ack所有小于當(dāng)前消息 deliveryTag 的消息。

舉個(gè)栗子: 假設(shè)我先發(fā)送三條消息deliveryTag分別是5、6、7,可它們都沒有被確認(rèn),當(dāng)我發(fā)第四條消息此時(shí)deliveryTag為8,multiple設(shè)置為 true,會(huì)將5、6、7、8的消息全部進(jìn)行確認(rèn)。

2、basicNack

basicNack :表示失敗確認(rèn),一般在消費(fèi)消息業(yè)務(wù)異常時(shí)用到此方法,可以將消息重新投遞入隊(duì)列。

void?basicNack(long?deliveryTag,?boolean?multiple,?boolean?requeue)

deliveryTag:表示消息投遞序號(hào)。

multiple:是否批量確認(rèn)。

requeue:值為 true 消息將重新入隊(duì)列。

3、basicReject

basicReject:拒絕消息,與basicNack區(qū)別在于不能進(jìn)行批量操作,其他用法很相似。

void?basicReject(long?deliveryTag,?boolean?requeue)

deliveryTag:表示消息投遞序號(hào)。

requeue:值為 true 消息將重新入隊(duì)列。

三、項(xiàng)目結(jié)構(gòu)圖

image.png

四、補(bǔ)充

1、別忘確認(rèn)消息

這是一個(gè)非常沒技術(shù)含量的坑,但卻是非常容易犯錯(cuò)的地方。

開啟消息確認(rèn)機(jī)制,消費(fèi)消息別忘了channel.basicAck,否則消息會(huì)一直存在,導(dǎo)致重復(fù)消費(fèi)。

2、消息無限投遞

在我最開始接觸消息確認(rèn)機(jī)制的時(shí)候,消費(fèi)端代碼就像下邊這樣寫的,思路很簡(jiǎn)單:處理完業(yè)務(wù)邏輯后確認(rèn)消息, int a = 1 / 0 發(fā)生異常后將消息重新投入隊(duì)列。

@RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {
        try {
            log.info("消費(fèi)者 2 號(hào)收到:{}", msg);
            int a = 1 / 0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

3、重復(fù)消費(fèi)

如何保證 MQ 的消費(fèi)是冪等性,這個(gè)需要根據(jù)具體業(yè)務(wù)而定,可以借助MySQL、或者redis將消息持久化,通過再消息中的唯一性屬性校驗(yàn)。

可以看到使用了 RabbitMQ 以后,我們的業(yè)務(wù)鏈路明顯變長(zhǎng)了,雖然做到了系統(tǒng)間的解耦,但可能造成消息丟失的場(chǎng)景也增加了。例如:

  • 消息生產(chǎn)者 - > rabbitmq服務(wù)器(消息發(fā)送失?。?/p>

  • rabbitmq服務(wù)器自身故障導(dǎo)致消息丟失

  • 消息消費(fèi)者 - > rabbitmq服務(wù)(消費(fèi)消息失敗)

 到此這篇關(guān)于springboot實(shí)現(xiàn)rabbitmq消息確認(rèn)的示例代碼的文章就介紹到這了,更多相關(guān)springboot rabbitmq消息確認(rèn)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java讀寫oracle的blob字段示例

    java讀寫oracle的blob字段示例

    這篇文章主要介紹了java讀寫oracle的blob字段示例,需要的朋友可以參考下
    2014-02-02
  • Java并發(fā)編程中的ConcurrentLinkedQueue詳解

    Java并發(fā)編程中的ConcurrentLinkedQueue詳解

    這篇文章主要介紹了Java并發(fā)編程中的ConcurrentLinkedQueue詳解,GetThread線程不會(huì)因?yàn)镃oncurrentLinkedQueue隊(duì)列為空而等待,而是直接返回null,所以當(dāng)實(shí)現(xiàn)隊(duì)列不空時(shí),等待時(shí),則需要用戶自己實(shí)現(xiàn)等待邏輯,需要的朋友可以參考下
    2023-12-12
  • springboot項(xiàng)目讀取resources目錄下的文件的9種方式

    springboot項(xiàng)目讀取resources目錄下的文件的9種方式

    本文主要介紹了springboot項(xiàng)目讀取resources目錄下的文件的9種方式,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • Spring Boot多模塊化后,服務(wù)間調(diào)用的坑及解決

    Spring Boot多模塊化后,服務(wù)間調(diào)用的坑及解決

    這篇文章主要介紹了Spring Boot多模塊化后,服務(wù)間調(diào)用的坑及解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • java面試常問的Runnable和Callable的區(qū)別

    java面試常問的Runnable和Callable的區(qū)別

    大家好,本篇文章主要講的是java面試常問的Runnable和Callable的區(qū)別,感興趣的同學(xué)趕快來看一看吧,對(duì)你有幫助的話記得收藏一下
    2022-01-01
  • Spring init-method與destroy-method屬性的用法解析

    Spring init-method與destroy-method屬性的用法解析

    這篇文章主要介紹了Spring init-method與destroy-method屬性的用法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • 淺談Java 并發(fā)的底層實(shí)現(xiàn)

    淺談Java 并發(fā)的底層實(shí)現(xiàn)

    這篇文章主要介紹了淺談Java 并發(fā)的底層實(shí)現(xiàn),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-12-12
  • JavaWeb Session 會(huì)話管理實(shí)例詳解

    JavaWeb Session 會(huì)話管理實(shí)例詳解

    這篇文章主要介紹了JavaWeb Session 會(huì)話管理的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,感興趣的朋友一起看看吧
    2016-09-09
  • Ribbon和Feign的區(qū)別及說明

    Ribbon和Feign的區(qū)別及說明

    本文介紹了Spring Cloud Netflix中的兩個(gè)負(fù)載均衡組件:Ribbon和Feign,Ribbon是一個(gè)基于HTTP和TCP客戶端的負(fù)載均衡工具,使用起來較為繁瑣,而Feign是一個(gè)使用接口方式的HTTP客戶端,采用類似MyBatis的@Mapper注解方式,使得編寫客戶端變得非常容易
    2024-11-11
  • SpringBoot部署到騰訊云的實(shí)現(xiàn)示例

    SpringBoot部署到騰訊云的實(shí)現(xiàn)示例

    記錄一下自己第一次部署springboot項(xiàng)目,本文主要介紹了SpringBoot部署到騰訊云的實(shí)現(xiàn)示例,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-08-08

最新評(píng)論