亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

java重試機(jī)制使用RPC必須考慮冪等性原理解析

 更新時(shí)間:2023年03月19日 09:16:14   作者:JAVA前線  
這篇文章主要為大家介紹了java重試機(jī)制使用RPC必須考慮冪等性原理解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

文章概述

在RPC場(chǎng)景中因?yàn)橹卦嚮蛘邲](méi)有實(shí)現(xiàn)冪等機(jī)制而導(dǎo)致的重復(fù)數(shù)據(jù)問(wèn)題,必須引起大家重視,有可能會(huì)造成例如一次購(gòu)買創(chuàng)建多筆訂單,一條通知信息被發(fā)送多次等問(wèn)題,這是技術(shù)人員必須面對(duì)和解決的問(wèn)題。

有人可能會(huì)說(shuō):當(dāng)調(diào)用失敗時(shí)程序并沒(méi)有顯示重試,為什么還會(huì)產(chǎn)生重復(fù)數(shù)據(jù)問(wèn)題呢?這是因?yàn)榧词箾](méi)有顯示重試,RPC框架在集群容錯(cuò)機(jī)制中自動(dòng)進(jìn)行了重試,這個(gè)問(wèn)題必須引起關(guān)注。

本文我們以DUBBO框架為例分析為什么重試,怎么做重試,怎么做冪等三個(gè)問(wèn)題。

1 為什么重試

如果簡(jiǎn)單對(duì)一個(gè)RPC交互過(guò)程進(jìn)行分類,我們可以分為三類:響應(yīng)成功、響應(yīng)失敗、沒(méi)有響應(yīng)。

對(duì)于響應(yīng)成功和響應(yīng)失敗這兩種情況,消費(fèi)者很好處理。因?yàn)轫憫?yīng)信息明確,所以只要根據(jù)響應(yīng)信息,繼續(xù)處理成功或者失敗邏輯即可。但是沒(méi)有響應(yīng)這種場(chǎng)景比較難處理,這是因?yàn)闆](méi)有響應(yīng)可能包含以下情況:

(1) 生產(chǎn)者根本沒(méi)有接收到請(qǐng)求

(2) 生產(chǎn)者接收到請(qǐng)求并且已處理成功,但是消費(fèi)者沒(méi)有接收到響應(yīng)

(3) 生產(chǎn)者接收到請(qǐng)求并且已處理失敗,但是消費(fèi)者沒(méi)有接收到響應(yīng)

假設(shè)你是一名RPC框架設(shè)計(jì)者,究竟是選擇重試還是放棄調(diào)用呢?其實(shí)最終如何選擇取決于業(yè)務(wù)特性,有的業(yè)務(wù)本身就具有冪等性,但是有的業(yè)務(wù)不能允許重試否則會(huì)造成重復(fù)數(shù)據(jù)。

那么誰(shuí)對(duì)業(yè)務(wù)特性最熟悉呢?答案是消費(fèi)者,因?yàn)橄M(fèi)者作為調(diào)用方肯定最熟悉自身業(yè)務(wù),所以RPC框架只要提供一些策略供消費(fèi)者選擇即可。

2 怎么做重試

2.1 集群容錯(cuò)策略

DUBBO作為一款優(yōu)秀RPC框架,提供了如下集群容錯(cuò)策略供消費(fèi)者選擇:

Failover: 故障轉(zhuǎn)移
Failfast: 快速失敗
Failsafe: 安全失敗
Failback: 異步重試
Forking:  并行調(diào)用
Broadcast:廣播調(diào)用

(1) Failover

故障轉(zhuǎn)移策略。作為默認(rèn)策略當(dāng)消費(fèi)發(fā)生異常時(shí)通過(guò)負(fù)載均衡策略再選擇一個(gè)生產(chǎn)者節(jié)點(diǎn)進(jìn)行調(diào)用,直到達(dá)到重試次數(shù)

(2) Failfast

快速失敗策略。消費(fèi)者只消費(fèi)一次服務(wù),當(dāng)發(fā)生異常時(shí)則直接拋出

(3) Failsafe

安全失敗策略。消費(fèi)者只消費(fèi)一次服務(wù),如果消費(fèi)失敗則包裝一個(gè)空結(jié)果,不拋出異常

(4) Failback

異步重試策略。當(dāng)消費(fèi)發(fā)生異常時(shí)返回一個(gè)空結(jié)果,失敗請(qǐng)求將會(huì)進(jìn)行異步重試。如果重試超過(guò)最大重試次數(shù)還不成功,放棄重試并不拋出異常

(5) Forking

并行調(diào)用策略。消費(fèi)者通過(guò)線程池并發(fā)調(diào)用多個(gè)生產(chǎn)者,只要有一個(gè)成功就算成功

(6) Broadcast

廣播調(diào)用策略。消費(fèi)者遍歷調(diào)用所有生產(chǎn)者節(jié)點(diǎn),任何一個(gè)出現(xiàn)異常則拋出異常

2.2 源碼分析

2.2.1 Failover

Failover故障轉(zhuǎn)移策略作為默認(rèn)策略,當(dāng)消費(fèi)發(fā)生異常時(shí)通過(guò)負(fù)載均衡策略再選擇一個(gè)生產(chǎn)者節(jié)點(diǎn)進(jìn)行調(diào)用,直到達(dá)到重試次數(shù)。即使業(yè)務(wù)代碼沒(méi)有顯示重試,也有可能多次執(zhí)行消費(fèi)邏輯從而造成重復(fù)數(shù)據(jù):

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
    public FailoverClusterInvoker(Directory<T> directory) {
        super(directory);
    }
    @Override
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        // 所有生產(chǎn)者Invokers
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        // 獲取重試次數(shù)
        int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        RpcException le = null;
        // 已經(jīng)調(diào)用過(guò)的生產(chǎn)者
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
        Set<String> providers = new HashSet<String>(len);
        // 重試直到達(dá)到最大次數(shù)
        for (int i = 0; i < len; i++) {
            if (i > 0) {
                // 如果當(dāng)前實(shí)例被銷毀則拋出異常
                checkWhetherDestroyed();
                // 根據(jù)路由策略選出可用生產(chǎn)者Invokers
                copyInvokers = list(invocation);
                // 重新檢查
                checkInvokers(copyInvokers, invocation);
            }
            // 負(fù)載均衡選擇一個(gè)生產(chǎn)者Invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 服務(wù)消費(fèi)發(fā)起遠(yuǎn)程調(diào)用
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le);
                }
                // 有結(jié)果則返回
                return result;
            } catch (RpcException e) {
                // 業(yè)務(wù)異常直接拋出
                if (e.isBiz()) {
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                // RpcException不拋出繼續(xù)重試
                le = new RpcException(e.getMessage(), e);
            } finally {
                // 保存已經(jīng)訪問(wèn)過(guò)的生產(chǎn)者
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }
}

消費(fèi)者調(diào)用生產(chǎn)者節(jié)點(diǎn)A發(fā)生RpcException異常時(shí)(例如超時(shí)異常),在未達(dá)到最大重試次數(shù)之前,消費(fèi)者會(huì)通過(guò)負(fù)載均衡策略再次選擇其它生產(chǎn)者節(jié)點(diǎn)消費(fèi)。試想如果生產(chǎn)者節(jié)點(diǎn)A其實(shí)已經(jīng)處理成功了,但是沒(méi)有及時(shí)將成功結(jié)果返回給消費(fèi)者,那么再次重試可能就會(huì)造成重復(fù)數(shù)據(jù)問(wèn)題。

2.2.2 Failfast

快速失敗策略。消費(fèi)者只消費(fèi)一次服務(wù),當(dāng)發(fā)生異常時(shí)則直接拋出,不會(huì)進(jìn)行重試:

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
    public FailfastClusterInvoker(Directory<T> directory) {
        super(directory);
    }
    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        // 檢查生產(chǎn)者Invokers是否合法
        checkInvokers(invokers, invocation);
        // 負(fù)載均衡選擇一個(gè)生產(chǎn)者Invoker
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
            // 服務(wù)消費(fèi)發(fā)起遠(yuǎn)程調(diào)用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            // 服務(wù)消費(fèi)失敗不重試直接拋出異常
            if (e instanceof RpcException && ((RpcException) e).isBiz()) {
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                                   "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
                                   + " select from all providers " + invokers + " for service " + getInterface().getName()
                                   + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                                   + " use dubbo version " + Version.getVersion()
                                   + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                                   e.getCause() != null ? e.getCause() : e);
        }
    }
}

2.2.3 Failsafe

安全失敗策略。消費(fèi)者只消費(fèi)一次服務(wù),如果消費(fèi)失敗則包裝一個(gè)空結(jié)果,不拋出異常,不會(huì)進(jìn)行重試:

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);
    public FailsafeClusterInvoker(Directory<T> directory) {
        super(directory);
    }
    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            // 檢查生產(chǎn)者Invokers是否合法
            checkInvokers(invokers, invocation);
            // 負(fù)載均衡選擇一個(gè)生產(chǎn)者Invoker
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
            // 服務(wù)消費(fèi)發(fā)起遠(yuǎn)程調(diào)用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            // 消費(fèi)失敗包裝為一個(gè)空結(jié)果對(duì)象
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            return new RpcResult();
        }
    }
}

2.2.4 Failback

異步重試策略。當(dāng)消費(fèi)發(fā)生異常時(shí)返回一個(gè)空結(jié)果,失敗請(qǐng)求將會(huì)進(jìn)行異步重試。如果重試超過(guò)最大重試次數(shù)還不成功,放棄重試并不拋出異常:

public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);
    private static final long RETRY_FAILED_PERIOD = 5;
    private final int retries;
    private final int failbackTasks;
    private volatile Timer failTimer;
    public FailbackClusterInvoker(Directory<T> directory) {
        super(directory);
        int retriesConfig = getUrl().getParameter(Constants.RETRIES_KEY, Constants.DEFAULT_FAILBACK_TIMES);
        if (retriesConfig <= 0) {
            retriesConfig = Constants.DEFAULT_FAILBACK_TIMES;
        }
        int failbackTasksConfig = getUrl().getParameter(Constants.FAIL_BACK_TASKS_KEY, Constants.DEFAULT_FAILBACK_TASKS);
        if (failbackTasksConfig <= 0) {
            failbackTasksConfig = Constants.DEFAULT_FAILBACK_TASKS;
        }
        retries = retriesConfig;
        failbackTasks = failbackTasksConfig;
    }
    private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
        if (failTimer == null) {
            synchronized (this) {
                if (failTimer == null) {
                    // 創(chuàng)建定時(shí)器
                    failTimer = new HashedWheelTimer(new NamedThreadFactory("failback-cluster-timer", true), 1, TimeUnit.SECONDS, 32, failbackTasks);
                }
            }
        }
        // 構(gòu)造定時(shí)任務(wù)
        RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
        try {
            // 定時(shí)任務(wù)放入定時(shí)器等待執(zhí)行
            failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
        } catch (Throwable e) {
            logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
        }
    }
    @Override
    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        Invoker<T> invoker = null;
        try {
            // 檢查生產(chǎn)者Invokers是否合法
            checkInvokers(invokers, invocation);
            // 負(fù)責(zé)均衡選擇一個(gè)生產(chǎn)者Invoker
            invoker = select(loadbalance, invocation, invokers, null);
            // 消費(fèi)服務(wù)發(fā)起遠(yuǎn)程調(diào)用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e);
            // 如果服務(wù)消費(fèi)失敗則記錄失敗請(qǐng)求
            addFailed(loadbalance, invocation, invokers, invoker);
            // 返回空結(jié)果
            return new RpcResult();
        }
    }
    @Override
    public void destroy() {
        super.destroy();
        if (failTimer != null) {
            failTimer.stop();
        }
    }
    /**
     * RetryTimerTask
     */
    private class RetryTimerTask implements TimerTask {
        private final Invocation invocation;
        private final LoadBalance loadbalance;
        private final List<Invoker<T>> invokers;
        private final int retries;
        private final long tick;
        private Invoker<T> lastInvoker;
        private int retryTimes = 0;
        RetryTimerTask(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, int retries, long tick) {
            this.loadbalance = loadbalance;
            this.invocation = invocation;
            this.invokers = invokers;
            this.retries = retries;
            this.tick = tick;
            this.lastInvoker = lastInvoker;
        }
        @Override
        public void run(Timeout timeout) {
            try {
                // 負(fù)載均衡選擇一個(gè)生產(chǎn)者Invoker
                Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));
                lastInvoker = retryInvoker;
                // 服務(wù)消費(fèi)發(fā)起遠(yuǎn)程調(diào)用
                retryInvoker.invoke(invocation);
            } catch (Throwable e) {
                logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
                // 超出最大重試次數(shù)記錄日志不拋出異常
                if ((++retryTimes) >= retries) {
                    logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation);
                } else {
                    // 未超出最大重試次數(shù)重新放入定時(shí)器
                    rePut(timeout);
                }
            }
        }
        private void rePut(Timeout timeout) {
            if (timeout == null) {
                return;
            }
            Timer timer = timeout.timer();
            if (timer.isStop() || timeout.isCancelled()) {
                return;
            }
            timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
        }
    }
}

2.2.5 Forking

并行調(diào)用策略。消費(fèi)者通過(guò)線程池并發(fā)調(diào)用多個(gè)生產(chǎn)者,只要有一個(gè)成功就算成功:

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer", true));
    public ForkingClusterInvoker(Directory<T> directory) {
        super(directory);
    }
    @Override
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            final List<Invoker<T>> selected;
            // 獲取配置參數(shù)
            final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
            final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 獲取并行執(zhí)行的Invoker列表
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers;
            } else {
                selected = new ArrayList<>();
                for (int i = 0; i < forks; i++) {
                    // 選擇生產(chǎn)者
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    // 防止重復(fù)增加Invoker
                    if (!selected.contains(invoker)) {
                        selected.add(invoker);
                    }
                }
            }
            RpcContext.getContext().setInvokers((List) selected);
            final AtomicInteger count = new AtomicInteger();
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
            for (final Invoker<T> invoker : selected) {
                // 在線程池中并發(fā)執(zhí)行
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 執(zhí)行消費(fèi)邏輯
                            Result result = invoker.invoke(invocation);
                            // 存儲(chǔ)消費(fèi)結(jié)果
                            ref.offer(result);
                        } catch (Throwable e) {
                            // 如果異常次數(shù)大于等于forks參數(shù)值說(shuō)明全部調(diào)用失敗,則把異常放入隊(duì)列
                            int value = count.incrementAndGet();
                            if (value >= selected.size()) {
                                ref.offer(e);
                            }
                        }
                    }
                });
            }
            try {
                // 從隊(duì)列獲取結(jié)果
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                // 如果異常類型表示全部調(diào)用失敗則拋出異常
                if (ret instanceof Throwable) {
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                }
                return (Result) ret;
            } catch (InterruptedException e) {
                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
            }
        } finally {
            RpcContext.getContext().clearAttachments();
        }
    }
}

2.2.6 Broadcast

廣播調(diào)用策略。消費(fèi)者遍歷調(diào)用所有生產(chǎn)者節(jié)點(diǎn),任何一個(gè)出現(xiàn)異常則拋出異常:

public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);
    public BroadcastClusterInvoker(Directory<T> directory) {
        super(directory);
    }
    @Override
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        RpcContext.getContext().setInvokers((List) invokers);
        RpcException exception = null;
        Result result = null;
        // 遍歷調(diào)用所有生產(chǎn)者節(jié)點(diǎn)
        for (Invoker<T> invoker : invokers) {
            try {
                // 執(zhí)行消費(fèi)邏輯
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = new RpcException(e.getMessage(), e);
                logger.warn(e.getMessage(), e);
            }
        }
        // 任何一個(gè)出現(xiàn)異常則拋出異常
        if (exception != null) {
            throw exception;
        }
        return result;
    }
}

3 怎么做冪等

經(jīng)過(guò)上述分析我們知道,RPC框架自帶的重試機(jī)制可能會(huì)造成數(shù)據(jù)重復(fù)問(wèn)題,那么在使用中必須考慮冪等性。冪等性是指一次操作與多次操作產(chǎn)生結(jié)果相同,并不會(huì)因?yàn)槎啻尾僮鞫a(chǎn)生不一致性。常見(jiàn)冪等方案有取消重試、冪等表、數(shù)據(jù)庫(kù)鎖、狀態(tài)機(jī)。

3.1 取消重試

取消重試有兩種方法,第一是設(shè)置重試次數(shù)為零,第二是選擇不重試的集群容錯(cuò)策略。

<!-- 設(shè)置重試次數(shù)為零 -->
<dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" retries="0" />
<!-- 選擇集群容錯(cuò)方案 -->
<dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" cluster="failfast" />

3.2 冪等表

假設(shè)用戶支付成功后,支付系統(tǒng)將支付成功消息,發(fā)送至消息隊(duì)列。物流系統(tǒng)訂閱到這個(gè)消息,準(zhǔn)備為這筆訂單創(chuàng)建物流單。

但是消息隊(duì)列可能會(huì)重復(fù)推送,物流系統(tǒng)有可能接收到多次這條消息。我們希望達(dá)到效果是:無(wú)論接收到多少條重復(fù)消息,只能創(chuàng)建一筆物流單。

解決方案是冪等表方案。新建一張冪等表,該表就是用來(lái)做冪等,無(wú)其它業(yè)務(wù)意義,有一個(gè)字段名為key建有唯一索引,這個(gè)字段是冪等標(biāo)準(zhǔn)。

物流系統(tǒng)訂閱到消息后,首先嘗試插入冪等表,訂單編號(hào)作為key字段。如果成功則繼續(xù)創(chuàng)建物流單,如果訂單編號(hào)已經(jīng)存在則違反唯一性原則,無(wú)法插入成功,說(shuō)明已經(jīng)進(jìn)行過(guò)業(yè)務(wù)處理,丟棄消息。

這張表數(shù)據(jù)量會(huì)比較大,我們可以通過(guò)定時(shí)任務(wù)對(duì)數(shù)據(jù)進(jìn)行歸檔,例如只保留7天數(shù)據(jù),其它數(shù)據(jù)存入歸檔表。

還有一種廣義冪等表就是我們可以用Redis替代數(shù)據(jù)庫(kù),在創(chuàng)建物流單之前,我們可以檢查Redis是否存在該訂單編號(hào)數(shù)據(jù),同時(shí)可以為這類數(shù)據(jù)設(shè)置7天過(guò)期時(shí)間。

3.3 狀態(tài)機(jī)

物流單創(chuàng)建成功后會(huì)發(fā)送消息,訂單系統(tǒng)訂閱到消息后更新?tīng)顟B(tài)為完成,假設(shè)變更是將訂單狀態(tài)0更新至狀態(tài)1。訂單系統(tǒng)也可能收到多條消息,可能在狀態(tài)已經(jīng)被更新至狀態(tài)1之后,依然收到物流單創(chuàng)建成功消息。

解決方案是狀態(tài)機(jī)方案。首先繪制狀態(tài)機(jī)圖,分析狀態(tài)流轉(zhuǎn)形態(tài)。例如經(jīng)過(guò)分析狀態(tài)1已經(jīng)是最終態(tài),那么即使接收到物流單創(chuàng)建成功消息也不再處理,丟棄消息。

3.4 數(shù)據(jù)庫(kù)鎖

數(shù)據(jù)庫(kù)鎖又可以分為悲觀鎖和樂(lè)觀鎖兩種類型,悲觀鎖是在獲取數(shù)據(jù)時(shí)加鎖:

select * from table where col='xxx' for update 

樂(lè)觀鎖是在更新時(shí)加鎖,第一步首先查出數(shù)據(jù),數(shù)據(jù)包含version字段。第二步進(jìn)行更新操作,如果此時(shí)記錄已經(jīng)被修改則version字段已經(jīng)發(fā)生變化,無(wú)法更新成功:

update table set xxx,
version = #{version} + 1 
where id = #{id} 
and version = #{version}

4 文章總結(jié)

本文首先分析了為什么重試這個(gè)問(wèn)題,因?yàn)閷?duì)于RPC交互無(wú)響應(yīng)場(chǎng)景,重試是一種重要選擇。然后分析了DUBBO提供的六種集群容錯(cuò)策略,F(xiàn)ailover作為默認(rèn)策略提供了重試機(jī)制,在業(yè)務(wù)代碼沒(méi)有顯示重試情況下,仍有可能發(fā)起多次調(diào)用,這必須引起重視。

最后我們分析了幾種常用冪等方案,希望本文對(duì)大家有所幫助,更多關(guān)于重試機(jī)制RPC冪等性的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • RocketMQ之NameServer架構(gòu)設(shè)計(jì)及啟動(dòng)關(guān)閉流程源碼分析

    RocketMQ之NameServer架構(gòu)設(shè)計(jì)及啟動(dòng)關(guān)閉流程源碼分析

    這篇文章主要為大家介紹了RocketMQ之NameServer架構(gòu)設(shè)計(jì)及啟動(dòng)關(guān)閉流程源碼分析詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步
    2021-11-11
  • Java詳細(xì)分析連接數(shù)據(jù)庫(kù)的流程

    Java詳細(xì)分析連接數(shù)據(jù)庫(kù)的流程

    Java數(shù)據(jù)庫(kù)連接,JDBC是Java語(yǔ)言中用來(lái)規(guī)范客戶端程序如何來(lái)訪問(wèn)數(shù)據(jù)庫(kù)的應(yīng)用程序接口,提供了諸如查詢和更新數(shù)據(jù)庫(kù)中數(shù)據(jù)的方法。JDBC也是Sun Microsystems的商標(biāo)。我們通常說(shuō)的JDBC是面向關(guān)系型數(shù)據(jù)庫(kù)的
    2022-05-05
  • Spring Boot集成Spring Cache過(guò)程詳解

    Spring Boot集成Spring Cache過(guò)程詳解

    這篇文章主要介紹了Spring Boot集成Spring Cache過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-10-10
  • Lombok不生效,提示java:?找不到符號(hào)的解決方案

    Lombok不生效,提示java:?找不到符號(hào)的解決方案

    這篇文章主要介紹了Lombok不生效,提示java:?找不到符號(hào)的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-07-07
  • 解決SpringBoot項(xiàng)目啟動(dòng)后網(wǎng)頁(yè)顯示Please sign in的問(wèn)題

    解決SpringBoot項(xiàng)目啟動(dòng)后網(wǎng)頁(yè)顯示Please sign in的問(wèn)題

    這篇文章主要介紹了解決SpringBoot項(xiàng)目啟動(dòng)后網(wǎng)頁(yè)顯示Please sign in的問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-04-04
  • IDEA 當(dāng)前在線人數(shù)和歷史訪問(wèn)量的示例代碼

    IDEA 當(dāng)前在線人數(shù)和歷史訪問(wèn)量的示例代碼

    這篇文章主要介紹了IDEA 當(dāng)前在線人數(shù)和歷史訪問(wèn)量的實(shí)例代碼,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-08-08
  • RabbitMQ消息隊(duì)列的目錄結(jié)構(gòu)

    RabbitMQ消息隊(duì)列的目錄結(jié)構(gòu)

    這篇文章主要介紹了RabbitMQ消息隊(duì)列的目錄結(jié)構(gòu),RabbitMQ?屬于消息中間件,主要用于組件之間的解耦,消息的發(fā)送者無(wú)需知道消息使用者的存在,反之亦然,那么用了那么久RabbitMQ,其目錄結(jié)構(gòu)是怎樣的呢,讓我們一起來(lái)看一下吧
    2023-08-08
  • JAVA不可變類(immutable)機(jī)制與String的不可變性(推薦)

    JAVA不可變類(immutable)機(jī)制與String的不可變性(推薦)

    這篇文章主要介紹了JAVA不可變類(immutable)機(jī)制與String的不可變性(推薦)的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下
    2016-08-08
  • SpringMVC攔截器——實(shí)現(xiàn)登錄驗(yàn)證攔截器的示例代碼

    SpringMVC攔截器——實(shí)現(xiàn)登錄驗(yàn)證攔截器的示例代碼

    本篇文章主要介紹了SpringMVC攔截器——實(shí)現(xiàn)登錄驗(yàn)證攔截器的示例代碼,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。
    2017-02-02
  • Java構(gòu)造器方法深入理解

    Java構(gòu)造器方法深入理解

    這篇文章主要介紹了Java構(gòu)造器方法深入理解,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-09-09

最新評(píng)論