亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

rabbitmq消息ACK確認(rèn)機(jī)制及發(fā)送失敗處理方式

 更新時間:2023年12月15日 09:29:51   作者:古甲哈醒  
這篇文章主要介紹了rabbitmq消息ACK確認(rèn)機(jī)制及發(fā)送失敗處理方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

rabbitmq為確保消息發(fā)送和接收成功,采用ack機(jī)制。

(1)生產(chǎn)者producter發(fā)送消息到mq時,mq會發(fā)送ack給producter告知消息是否投遞成功;

(2)消費(fèi)者consumer接收處理消息后,consumer會發(fā)送ack給mq告知消息是否處理成功;

通過ack機(jī)制,確保消息能夠被producter成功發(fā)送和consumer成功接收處理,保證消息不丟失。

1、消息發(fā)送

rabbitmq消息發(fā)送分為兩個階段:

(1)producter將消息發(fā)送到broker,即發(fā)送到exchage交換機(jī);

(2)消息通過交換機(jī)exchange被路由到隊列queue;

消息只有被正確投遞到隊列queue中,才算發(fā)送成功。

消息發(fā)送代碼:

    public boolean send(String queueName, String json, String msgId){
        Message message = MessageBuilder.withBody(json.getBytes()).setCorrelationId(msgId).build();
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//設(shè)置消息持久化
        CorrelationDataExt correlationData = new CorrelationDataExt();
        correlationData.setId(msgId);
        correlationData.setData(json);
        rabbitTemplate.setEncoding("UTF-8");
        rabbitTemplate.setMandatory(true);//設(shè)置手工ack確認(rèn)
        rabbitTemplate.setConfirmCallback(this);//ack回調(diào)
        rabbitTemplate.setReturnCallback(this);//回退回調(diào)
        rabbitTemplate.convertAndSend(queueName, message, correlationData);
        return true;
    }

在消息發(fā)送之前,我們要設(shè)置ack機(jī)制相關(guān)參數(shù): 

  • setMandatory:設(shè)置手工確認(rèn)ack;
  • setConfirmCallback:設(shè)置消息發(fā)送到exchange結(jié)果回調(diào);
  • setReturnCallback:設(shè)置消息投遞到queue失敗回退時回調(diào);

通過上述兩個回調(diào)方法,我們能夠?qū)Πl(fā)送失敗的消息進(jìn)行重發(fā)處理,確保消息不丟失。

2、消息發(fā)送失敗

根據(jù)rabbitmq發(fā)送過程,消息發(fā)送失敗的有三種情況會出現(xiàn):

(1)producter連接mq失敗,消息沒有發(fā)送到mq

(2)producter連接mq成功,但是發(fā)送到exchange失敗

(3)消息發(fā)送到exchange成功,但是路由到queue失??;

3、發(fā)送失敗處理

(1)producter連接mq失敗,消息沒有發(fā)送到mq

這種情況下,在發(fā)送消息時可以通過捕捉AmqpException異常,將消息保存db中后續(xù)進(jìn)行重發(fā)處理。

        try{
            rabbitTemplate.convertAndSend(queueName, message, correlationData);
        }catch (Exception e){
            logger.error("連接MQ失敗", e);
            //todo 存儲到db中進(jìn)行重發(fā)
        }

(2)producter連接mq成功,但是發(fā)送到exchange失敗

通過實現(xiàn)ConfirmCallback接口,對發(fā)送結(jié)果進(jìn)行處理。

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String msgId = correlationData.getId();
        if(ack){
            //發(fā)送成功
            logger.debug("ack,消息投遞到exchange成功,msgId:{}",msgId);
        }else{
            //發(fā)送失敗,重試
            logger.error("ack,消息投遞exchange失敗,msgId:{},原因{}" ,msgId, cause);
           
        }
    }

confirm方法有3個參數(shù),correlationData是消息發(fā)送時攜帶的數(shù)據(jù)對象,ack消息是否成功發(fā)送到exchange,cause是發(fā)送失敗時的原因。

通過ack我們可以判斷發(fā)送到exchange是否成功,如果ack=false,則我們進(jìn)行失敗處理。 

但是這里存在一個問題,correlationData里面只有一個id屬性,沒有關(guān)于消息內(nèi)容的屬性,對于數(shù)據(jù)失敗處理非常不方便。

為解決此問題,我們可以自定義一個CorrelationData擴(kuò)展對象,繼承CorrelationData,并添加自己想要保存數(shù)據(jù)的屬性,在消息發(fā)送時,攜帶相關(guān)數(shù)據(jù)在該對象上即可。

自定義CorrelationData對象:

/**
 * CorrelationData的自定義實現(xiàn),用于拿到消息內(nèi)容
 */
public class CorrelationDataExt extends CorrelationData {
    //數(shù)據(jù)
    private volatile Object data;
    //隊列
    private String queueName;

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }

    public String getQueueName() {
        return queueName;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }
}

重寫發(fā)送方法,使用CorrelationDataExt對象攜帶數(shù)據(jù):

    public boolean send(String queueName, String json, String msgId){
        Message message = MessageBuilder.withBody(json.getBytes()).setCorrelationId(msgId).build();
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//設(shè)置消息持久化

        //使用自定義的數(shù)據(jù)對象
        CorrelationDataExt correlationData = new CorrelationDataExt();
        correlationData.setId(msgId);
        correlationData.setData(json);
        correlationData.setQueueName(queueName);

        rabbitTemplate.setEncoding("UTF-8");
        rabbitTemplate.setMandatory(true);//設(shè)置手工ack確認(rèn)
        rabbitTemplate.setConfirmCallback(this);//設(shè)置發(fā)送成功回調(diào)
        rabbitTemplate.setReturnCallback(this);//設(shè)置消息回退回調(diào)
        try{
            rabbitTemplate.convertAndSend(queueName, message, correlationData);//使用amqp default exchange direct
        }catch (Exception e){
            logger.error("MQ連接失敗,請聯(lián)系管理員處理!!!!");
            //保存到db重發(fā)
            saveToDB(msgId, json, queueName, "90");
        }
        return true;
    }

重寫confirm方法,對CorrelationData進(jìn)行處理:

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String msgId = correlationData.getId();
        if(ack){
            //發(fā)送成功
            logger.debug("ack,消息投遞到exchange成功,msgId:{}",msgId);
        }else{
            //發(fā)送失敗,重試
            logger.error("ack,消息投遞exchange失敗,msgId:{},原因{}" ,msgId, cause);
            if(correlationData instanceof CorrelationDataExt){
                CorrelationDataExt correlationDataExt = (CorrelationDataExt) correlationData;
                String message = (String) correlationDataExt.getData();
                String queueName = ((CorrelationDataExt) correlationData).getQueueName();
                saveToDB(msgId, message, queueName, "91");
            }else{
                logger.info("correlationData對象不包含數(shù)據(jù)");
            }
        }
    }

(3)消息發(fā)送到exchange成功,但是路由到queue失敗

通過實現(xiàn)ReturnCallback接口,對回退消息進(jìn)行重發(fā)處理。

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.error("消息發(fā)送失敗-消息回退,應(yīng)答碼:{},原因:{},交換機(jī):{},路由鍵:{}", replyCode, replyText, exchange, routingKey);
        String msgId = message.getMessageProperties().getCorrelationId();
        String data = new String(message.getBody());
        saveToDB(msgId, data, routingKey, "92");
    }

關(guān)于對失敗消息的處理,我這里是統(tǒng)一保存到DB中,后續(xù)通過定時任務(wù)進(jìn)行重發(fā)處理的。

通過以上3個方面對失敗消息的處理,可以確保消息能夠成功發(fā)送到mq,確保不丟失。

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • java二維數(shù)組實現(xiàn)推箱子小游戲

    java二維數(shù)組實現(xiàn)推箱子小游戲

    這篇文章主要為大家詳細(xì)介紹了java二維數(shù)組實現(xiàn)推箱子小游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2020-11-11
  • springboot實現(xiàn)郵箱驗證碼功能

    springboot實現(xiàn)郵箱驗證碼功能

    這篇文章主要為大家詳細(xì)介紹了springboot實現(xiàn)郵箱驗證碼功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2020-02-02
  • Feign Client 超時時間配置不生效的解決

    Feign Client 超時時間配置不生效的解決

    這篇文章主要介紹了Feign Client 超時時間配置不生效的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • Java實現(xiàn)動態(tài)獲取圖片驗證碼的示例代碼

    Java實現(xiàn)動態(tài)獲取圖片驗證碼的示例代碼

    這篇文章主要介紹了Java實現(xiàn)動態(tài)獲取圖片驗證碼的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-08-08
  • 詳解Spring-Boot集成Spring session并存入redis

    詳解Spring-Boot集成Spring session并存入redis

    這篇文章主要介紹了詳解Spring-Boot集成Spring session并存入redis,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-05-05
  • MyBatis逆向工程基本操作及代碼實例

    MyBatis逆向工程基本操作及代碼實例

    這篇文章主要介紹了MyBatis逆向工程基本操作及代碼實例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-07-07
  • Java實現(xiàn)跳躍表(skiplist)的簡單實例

    Java實現(xiàn)跳躍表(skiplist)的簡單實例

    這篇文章主要介紹了Java編程中跳躍表的概念和實現(xiàn)原理,并簡要敘述了它的結(jié)構(gòu),具有一定參考價值,需要的朋友可以了解下。
    2017-09-09
  • Spring @Transactional注解的聲明式事務(wù)簡化業(yè)務(wù)邏輯中的事務(wù)管理

    Spring @Transactional注解的聲明式事務(wù)簡化業(yè)務(wù)邏輯中的事務(wù)管理

    這篇文章主要為大家介紹了Spring @Transactional注解的聲明式事務(wù)簡化業(yè)務(wù)邏輯中的事務(wù)管理,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-10-10
  • Spring?Boot?中的?Native?SQL基本概念及使用方法

    Spring?Boot?中的?Native?SQL基本概念及使用方法

    在本文中,我們介紹了 Spring Boot 中的 Native SQL,以及如何使用 JdbcTemplate 和 NamedParameterJdbcTemplate 來執(zhí)行自定義的 SQL 查詢或更新語句,需要的朋友跟隨小編一起看看吧
    2023-07-07
  • 詳解Spring Boot 目錄文件結(jié)構(gòu)

    詳解Spring Boot 目錄文件結(jié)構(gòu)

    這篇文章主要介紹了Spring Boot 目錄文件結(jié)構(gòu)的相關(guān)資料,文中示例代碼非常詳細(xì),幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下
    2020-07-07

最新評論