亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

詳解SpringBoot整合RabbitMQ如何實(shí)現(xiàn)消息確認(rèn)

 更新時(shí)間:2022年05月26日 08:38:53   作者:IT利刃出鞘  
這篇文章主要介紹了SpringBoot整合RabbitMQ是如何實(shí)現(xiàn)消息確認(rèn)的,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

簡(jiǎn)介

本文介紹SpringBoot整合RabbitMQ如何進(jìn)行消息的確認(rèn)。

生產(chǎn)者消息確認(rèn)

介紹

發(fā)送消息確認(rèn):用來(lái)確認(rèn)消息從 producer發(fā)送到 broker 然后broker 的 exchange 到 queue過(guò)程中,消息是否成功投遞。

如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)將消息寫(xiě)入磁盤(pán)之后發(fā)出;如果是鏡像隊(duì)列,所有鏡像接受成功后發(fā)確認(rèn)消息。

流程

  • 如果消息沒(méi)有到達(dá)exchange,則confirm回調(diào),ack=false
  • 如果消息到達(dá)exchange,則confirm回調(diào),ack=true
  • exchange到queue成功,則不回調(diào)return
  • exchange到queue失敗,則回調(diào)return(需設(shè)置mandatory=true,否則不會(huì)回調(diào),這樣消息就丟了)

配置

application.yml

# 發(fā)送者開(kāi)啟 confirm 確認(rèn)機(jī)制
spring.rabbitmq.publisher-confirms=true
# 發(fā)送者開(kāi)啟 return 確認(rèn)機(jī)制
spring.rabbitmq.publisher-returns=true

ConfirmCallback

ConfirmCallback:消息只要被 RabbitMQ broker 接收到就會(huì)觸發(fā)confirm方法。

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("confirm==>發(fā)送到broker失敗\r\n" + 
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                            correlationData, ack, cause);
        } else {
            log.info("confirm==>發(fā)送到broker成功\r\n" + 
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                            correlationData, ack, cause);
        }
    }
}

correlationData:對(duì)象內(nèi)部有id (消息的唯一性)和Message。

(若ack為false,則Message不為null,可將Message數(shù)據(jù) 重新投遞;若ack是true,則correlationData為null)

ack:消息投遞到exchange 的狀態(tài),true表示成功。

cause:表示投遞失敗的原因。 (若ack為false,則cause不為null;若ack是true,則cause為null)

給每一條信息添加一個(gè)dataId,放在CorrelationData,這樣在RabbitConfirmCallback返回失敗時(shí)可以知道哪個(gè)消息失敗。

public void send(String dataId, String exchangeName, String rountingKey, String message){
    CorrelationData correlationData = new CorrelationData();
    correlationData.setId(dataId);
 
    rabbitTemplate.convertAndSend(exchangeName, rountingKey, message, correlationData);
}
 
public String receive(String queueName){
    return String.valueOf(rabbitTemplate.receiveAndConvert(queueName));
}

2.1版本開(kāi)始,CorrelationData對(duì)象具有ListenableFuture,可用于獲取結(jié)果,而不是在rabbitTemplate上使用ConfirmCallback。

CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());

ReturnCallback

ReturnCallback:如果消息未能投遞到目標(biāo) queue 里將觸發(fā)returnedMessage方法。

若向 queue 投遞消息未成功,可記錄下當(dāng)前消息的詳細(xì)投遞數(shù)據(jù),方便后續(xù)做重發(fā)或者補(bǔ)償?shù)炔僮鳌?/p>

注意:需要rabbitTemplate.setMandatory(true);

當(dāng)mandatory設(shè)置為true時(shí),若exchange根據(jù)自身類型和消息routingKey無(wú)法找到一個(gè)合適的queue存儲(chǔ)消息,那么broker會(huì)調(diào)用basic.return方法將消息返還給生產(chǎn)者。當(dāng)mandatory設(shè)置為false時(shí),出現(xiàn)上述情況broker會(huì)直接將消息丟棄。

代碼:

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey) {
        log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
                        "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
                        message, replyCode, replyText, exchange, routingKey);
    }
}

message(消息體)、replyCode(響應(yīng)code)、replyText(響應(yīng)內(nèi)容)、exchange(交換機(jī))、routingKey(隊(duì)列)。 

注冊(cè)ConfirmCallback和ReturnCallback

整合后的寫(xiě)法

package com.example.config;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import javax.annotation.PostConstruct;
 
@Slf4j
@Configuration
public class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        return rabbitTemplate;
    }
 
    // 下邊這樣寫(xiě)也可以    
    // @Autowired
    // private RabbitTemplate rabbitTemplate;
    // @PostConstruct
    // public void init() {
    //     rabbitTemplate.setMandatory(true);
    //     rabbitTemplate.setReturnCallback(this);
    //     rabbitTemplate.setConfirmCallback(this);
    // }
 
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("confirm==>發(fā)送到broker失敗\r\n" +
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                            correlationData, ack, cause);
        } else {
            log.info("confirm==>發(fā)送到broker成功\r\n" +
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                            correlationData, ack, cause);
        }
    }
 
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey) {
        log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
                        "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
                        message, replyCode, replyText, exchange, routingKey);
    }
}

消費(fèi)者消息確認(rèn)

介紹

確認(rèn)方式簡(jiǎn)介詳述
auto(默認(rèn))根據(jù)消息消費(fèi)的情況,智能判定若消費(fèi)者拋出異常,則mq不會(huì)收到確認(rèn)消息,mq會(huì)一直此消息發(fā)出去
若消費(fèi)者沒(méi)有拋出異常,則mq會(huì)收到確認(rèn)消息,mq不會(huì)再次將此消息發(fā)出去。
若消費(fèi)者在消費(fèi)時(shí)所在服務(wù)掛了,mq不會(huì)再次將此消息發(fā)出去。
nonemq發(fā)出消息后直接確認(rèn)消息 
manual消費(fèi)端手動(dòng)確認(rèn)消息消費(fèi)者調(diào)用 ack、nack、reject 幾種方法進(jìn)行確認(rèn),可以在業(yè)務(wù)失敗后進(jìn)行一些操作,如果消息未被 ACK 則消息還會(huì)存在于MQ,mq會(huì)一直將此消息發(fā)出去。
如果某個(gè)服務(wù)忘記 ACK 了,則 RabbitMQ 不會(huì)再發(fā)送數(shù)據(jù)給它,因?yàn)?RabbitMQ 認(rèn)為該服務(wù)的處理能力有限。

只要消息沒(méi)有被消費(fèi)者確認(rèn)(包括沒(méi)有自動(dòng)確認(rèn)),會(huì)導(dǎo)致消息一直被失敗消費(fèi),死循環(huán)導(dǎo)致消耗大量資源。正確的處理方式是:發(fā)生異常,將消息記錄到db,再通過(guò)補(bǔ)償機(jī)制來(lái)補(bǔ)償消息,或者記錄消息的重復(fù)次數(shù),進(jìn)行重試,超過(guò)幾次后再放到db中。

消息確認(rèn)三種方式配置方法

spring.rabbitmq.listener.simple.acknowledge-mode=manual

spring.rabbitmq.listener.direct.acknowledge-mode=manual

手動(dòng)確認(rèn)三種方式

basicAck,basicNack,basicReject

basicAck

含義

表示成功確認(rèn),使用此回執(zhí)方法后,消息會(huì)被RabbitMQ broker 刪除。

函數(shù)原型

void basicAck(long deliveryTag, boolean multiple) 

deliveryTag

  • 消息投遞序號(hào)
  • 每次消費(fèi)消息或者消息重新投遞后,deliveryTag都會(huì)增加。手動(dòng)消息確認(rèn)模式下,我們可以對(duì)指定deliveryTag的消息進(jìn)行ack、nack、reject等操作。

multiple

  • 是否批量確認(rèn)
  • 值為 true 則會(huì)一次性 ack所有小于當(dāng)前消息 deliveryTag 的消息。

示例: 假設(shè)我先發(fā)送三條消息deliveryTag分別是5、6、7,可它們都沒(méi)有被確認(rèn),當(dāng)我發(fā)第四條消息此時(shí)deliveryTag為8,multiple設(shè)置為 true,會(huì)將5、6、7、8的消息全部進(jìn)行確認(rèn)。

實(shí)例

@RabbitHandler
public void process(String content, Channel channel, Message message){
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

basicNack

含義

表示失敗確認(rèn),一般在消費(fèi)消息業(yè)務(wù)異常時(shí)用到此方法,可以將消息重新投遞入隊(duì)列。

函數(shù)原型

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

  • deliveryTag:表示消息投遞序號(hào)。
  • multiple:是否批量確認(rèn)。
  • requeue:值為 true 消息將重新入隊(duì)列。

basicReject

含義

拒絕消息,與basicNack區(qū)別在于不能進(jìn)行批量操作,其他用法很相似。

函數(shù)原型

void basicReject(long deliveryTag, boolean requeue)

  • deliveryTag:表示消息投遞序號(hào)。
  • requeue:值為 true 消息將重新入隊(duì)列。

以上就是詳解SpringBoot整合RabbitMQ如何實(shí)現(xiàn)消息確認(rèn)的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot RabbitMQ消息確認(rèn)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論