SpringCloud微服務(wù)剔除下線功能實現(xiàn)原理分析
一、前言
上一篇SpringCloud微服務(wù)續(xù)約源碼解析已經(jīng)分析了心跳機(jī)制是什么、底層實現(xiàn)、客戶端發(fā)送心跳的主要代碼、注冊中心處理心跳的過程,這節(jié)跟它是緊密關(guān)聯(lián)的。聯(lián)系的樞紐就是lastUpdateTimestamp最后更新時間戳,它是Lease租約類的一個用volatile關(guān)鍵字修飾的對其他線程透明可見的字段。那么Eureka是如何使用該字段判斷服務(wù)是否過期的?然后進(jìn)行服務(wù)的剔除下線?需要借助什么機(jī)制?該機(jī)制是什么時候能觸發(fā)的?帶著這些問題,我們下面來探究一番:
二、微服務(wù)剔除下線源碼解析
EurekaBootStrap是Eureka項目里面的,用于啟動Eureka服務(wù)器的類:
Eureka 服務(wù)器使用類路徑中eureka.server.props指定的EurekaServerConfig進(jìn)行配置。Eureka客戶端組件也是通過使用eureka.client.props指定的配置 EurekaInstanceConfig初始化的。如果服務(wù)器在AWS云中運(yùn)行,則eureka服務(wù)器將其綁定到指定的彈性ip。
1、EurekaBootStrap#contextInitialized()
@Override public void contextInitialized(ServletContextEvent event) { try { initEurekaEnvironment(); // 初始化注冊中心上下文 initEurekaServerContext(); ServletContext sc = event.getServletContext(); sc.setAttribute(EurekaServerContext.class.getName(), serverContext); } catch (Throwable e) { logger.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } }
它這里也使用了事件機(jī)制,但是不是基于Spring的,感興趣的可以去了解下。初始化注冊中心上下文,即下面的處理邏輯:
1.1、初始化注冊中心上下文
protected void initEurekaServerContext() throws Exception { EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig(); // For backward compatibility JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); logger.info("Initializing the eureka client..."); logger.info(eurekaServerConfig.getJsonCodecName()); ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig); ApplicationInfoManager applicationInfoManager = null; if (eurekaClient == null) { EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext()) ? new CloudInstanceConfig() : new MyDataCenterInstanceConfig(); applicationInfoManager = new ApplicationInfoManager( instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get()); EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig(); eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig); } else { applicationInfoManager = eurekaClient.getApplicationInfoManager(); } PeerAwareInstanceRegistry registry; if (isAws(applicationInfoManager.getInfo())) { registry = new AwsInstanceRegistry( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager); awsBinder.start(); } else { registry = new PeerAwareInstanceRegistryImpl( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); } PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes( registry, eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, applicationInfoManager ); serverContext = new DefaultEurekaServerContext( eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, applicationInfoManager ); EurekaServerContextHolder.initialize(serverContext); serverContext.initialize(); logger.info("Initialized server context"); // Copy registry from neighboring eureka node int registryCount = registry.syncUp(); registry.openForTraffic(applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); }
做一些初始化工作,重點(diǎn)關(guān)注registry.openForTraffic(applicationInfoManager, registryCount);的調(diào)用,進(jìn)入下面處理邏輯:
1.2、openForTraffic()邏輯
@Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. // 更新每30秒發(fā)生一次,一分鐘應(yīng)該是2倍。 this.expectedNumberOfClientsSendingRenews = count; updateRenewsPerMinThreshold(); logger.info("Got {} instances from neighboring DS node", count); logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } // 更改服務(wù)實例狀態(tài)為UP logger.info("Changing status to UP"); applicationInfoManager.setInstanceStatus(InstanceStatus.UP); // 調(diào)用父類初始化 super.postInit(); }
更改服務(wù)實例狀態(tài)為UP,調(diào)用父類初始化。
1.3、postInit()執(zhí)行任務(wù)
protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); }
終于來到剔除任務(wù)了,前面說了什么,就是一些初始化的工作。它這里的執(zhí)行器是Timer,跟Nacos不一樣,區(qū)別的話感興趣的就自行去搞個明白。我們進(jìn)入下面的分析:
1.4、剔除任務(wù)
EvictionTask是TimerTask類型任務(wù)。
class EvictionTask extends TimerTask { private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l); @Override public void run() { try { long compensationTimeMs = getCompensationTimeMs(); logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs); evict(compensationTimeMs); } catch (Throwable e) { logger.error("Could not run the evict task", e); } } /** * 計算一個補(bǔ)償時間,該時間定義為自上一次迭代以來該任務(wù)的實際執(zhí)行時間,與配置的執(zhí)行時間量相比較。 * 這對于時間變化(例如由于時鐘偏差或 gc)導(dǎo)致實際的驅(qū)逐任務(wù)根據(jù)配置的周期在所需時間之后執(zhí)行的情況 * 非常有用。 */ long getCompensationTimeMs() { long currNanos = getCurrentTimeNano(); long lastNanos = lastExecutionNanosRef.getAndSet(currNanos); if (lastNanos == 0l) { return 0l; } long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos); long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs(); return compensationTime <= 0l ? 0l : compensationTime; } long getCurrentTimeNano() { // for testing return System.nanoTime(); } }
主要邏輯:
計算一個補(bǔ)償時間,該時間定義為自上一次迭代以來該任務(wù)的實際執(zhí)行時間,與配置的執(zhí)行時間量相比較。這對于時間變化(例如由于時鐘偏差或 gc)導(dǎo)致實際的驅(qū)逐任務(wù)根據(jù)配置的周期在所需時間之后執(zhí)行的情況非常有用。
調(diào)用evict(compensationTimeMs)剔除處理,下面分析:
2、服務(wù)剔除下線
2.1、AbstractInstanceRegistry#evict()邏輯
public void evict(long additionalLeaseMs) { logger.debug("Running the evict task"); if (!isLeaseExpirationEnabled()) { // DS: 租約到期目前已禁用。 logger.debug("DS: lease expiration is currently disabled."); return; } // 我們首先收集所有過期的物品,以隨機(jī)的順序驅(qū)逐它們。對于大型驅(qū)逐集,如果我們不這樣做, // 我們可能會在自我保護(hù)啟動之前刪除整個應(yīng)用程序。通過隨機(jī)化,影響應(yīng)該均勻地分布在所有應(yīng)用程序中。 List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); // 判斷租約是否過期 if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { // 添加到過期續(xù)租集合 expiredLeases.add(lease); } } } } // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for // triggering self-preservation. Without that we would wipe out full registry. // 為了補(bǔ)償 GC 暫?;蚱频谋镜貢r間,我們需要使用當(dāng)前的注冊表大小作為觸發(fā)自我保存的基礎(chǔ)。 // 沒有這個,我們就會清除整個注冊表。 // 獲取注冊表租約總數(shù) int registrySize = (int) getLocalRegistrySize(); // 計算注冊表租約的閾值 (總數(shù)乘以 續(xù)租百分比 默認(rèn)85%),得出要續(xù)租的數(shù)量 int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); // 理論要剔除的數(shù)量 = 總數(shù)-要續(xù)租的數(shù)量 int evictionLimit = registrySize - registrySizeThreshold; // 實際剔除的數(shù)量 = min(實際租期到期服務(wù)實例個數(shù),理論剔除數(shù)量) int toEvict = Math.min(expiredLeases.size(), evictionLimit); // 將要剔除數(shù)量大于0,把它們下線處理,從本地注冊表移除掉以保證高可用 if (toEvict > 0) { logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit); Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++) { // 選擇一個隨機(jī)的項目(Knuth 洗牌算法) int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName(); String id = lease.getHolder().getId(); EXPIRED.increment(); // 注冊表: {}/{}的租約已過期 logger.warn("DS: Registry: expired lease for {}/{}", appName, id); // 服務(wù)下線 internalCancel(appName, id, false); } } }
主要邏輯:
- 判斷租約到期是否禁用,如果禁用return。默認(rèn)啟用
- 首先收集所有過期的租約,以隨機(jī)的順序剔除它們。對于大型剔除集,如果不這樣做,可能會在自我保護(hù)啟動之前刪除整個應(yīng)用程序。通過隨機(jī)化,影響應(yīng)該均勻地分布在所有應(yīng)用程序中。判斷租約是否過期,如果過期添加到過期租約集合,繼續(xù)遍歷到。
- 為了補(bǔ)償 GC 暫停或漂移的本地時間,需要使用當(dāng)前的注冊表大小作為觸發(fā)自我保存的基礎(chǔ)。沒有這個,就會清除整個注冊表。1)獲取注冊表租約總數(shù);2)計算注冊表租約的閾值 (總數(shù)乘以 續(xù)租百分比 默認(rèn)85%),得出要續(xù)租的數(shù)量;3)理論要剔除的數(shù)量 = 總數(shù)-要續(xù)租的數(shù)量;4)實際剔除的數(shù)量 = min(實際租期到期服務(wù)實例個數(shù),理論剔除數(shù)量);
- 將要剔除數(shù)量大于0,把它們下線處理,從本地注冊表移除掉以保證高可用:選擇一個隨機(jī)的項目(Knuth 洗牌算法),調(diào)用internalCancel(appName, id, false)下線處理。
2.1、判斷是否過期
public boolean isExpired(long additionalLeaseMs) { return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs)); }
如果是cancel()處理前面的值就大于0,一般是判斷后面部分邏輯:如果當(dāng)前系統(tǒng)時間戳小于后面的時間戳之和,則沒有過期;否則大于就是過期了。
duration的值也可以通過配置文件更改,通過yml配置文件中eureka:instance:lease-expiration-duration-in-seconds:指定,不過必須大于eureka:instance:lease-renewal-interval-in-seconds默認(rèn)值或指定值。設(shè)置duration太長可能意味著即使實例不存在,流量也可能被路由到該實例。將此值設(shè)置得太小可能意味著,由于臨時網(wǎng)絡(luò)故障,該實例可能會從流量中刪除。因此duration的值要設(shè)置為至少高于eureka:instance:lease-renewal-interval-in-seconds中默認(rèn)的或指定的值。
2.2、從本地列表異常下線處理
cancel(String,String,boolean)方法被PeerAwareInstanceRegistry重寫了,因此每個取消請求都被復(fù)制到對等點(diǎn)。然而,對于在遠(yuǎn)程對等點(diǎn)中被視為有效取消的過期,這是不需要的,因此自我保存模式不會啟用。
protected boolean internalCancel(String appName, String id, boolean isReplication) { // 加鎖 read.lock(); try { CANCEL.increment(isReplication); // 根據(jù)appName從本地注冊表獲取租約服務(wù)實例 Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToCancel = null; if (gMap != null) { // 根據(jù)唯一ID從本地移除服務(wù)實例,下線 leaseToCancel = gMap.remove(id); } recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")")); InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id); if (instanceStatus != null) { logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name()); } if (leaseToCancel == null) { // 下線失敗,因為租約信息中不存在該服務(wù)實例 CANCEL_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id); return false; } else { // 通過更新剔除時間取消租約。 leaseToCancel.cancel(); // 從租約獲取服務(wù)實例 InstanceInfo instanceInfo = leaseToCancel.getHolder(); String vip = null; String svip = null; if (instanceInfo != null) { instanceInfo.setActionType(ActionType.DELETED); recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel)); instanceInfo.setLastUpdatedTimestamp(); vip = instanceInfo.getVIPAddress(); svip = instanceInfo.getSecureVipAddress(); } // 使特定應(yīng)用程序的緩存失效 invalidateCache(appName, vip, svip); logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication); } } finally { // 釋放鎖 read.unlock(); } synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to cancel it, reduce the number of clients to send renews. this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1; updateRenewsPerMinThreshold(); } } return true; }
主要邏輯:
- 獲取鎖后,根據(jù)appName從本地注冊表獲取租約服務(wù)實例
- 根據(jù)唯一ID從本地移除服務(wù)實例,下線
- 如果需下線租約信息為空,則下線失敗,因為租約信息中不存在該服務(wù)實例,return假;否則可能通過更新剔除時間取消租約,從租約獲取服務(wù)實例以便使特定應(yīng)用程序的緩存失效
- 釋放鎖
到此這篇關(guān)于SpringCloud微服務(wù)剔除下線功能實現(xiàn)原理分析的文章就介紹到這了,更多相關(guān)SpringCloud微服務(wù)剔除下線內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot中定時任務(wù)@Scheduled的多線程使用詳解
這篇文章主要為大家詳細(xì)介紹了pring Boot定時任務(wù)@Scheduled的多線程原理以及如何加入線程池來處理定時任務(wù),感興趣的可以了解一下2023-04-04Spring Cloud Hystrix實現(xiàn)服務(wù)容錯的方法
Hystrix是SpringCloud中重要的熔斷保護(hù)組件,由Netflix開源,主要提供延遲和容錯管理,以保障分布式系統(tǒng)的高可用性和魯棒性,通過封裝依賴項實現(xiàn)服務(wù)間隔離,引入回退邏輯應(yīng)對依賴服務(wù)故障,有效防止系統(tǒng)崩潰和服務(wù)級聯(lián)故障2024-10-10Mybatis實現(xiàn)數(shù)據(jù)的增刪改查實例(CRUD)
本篇文章主要介紹了Mybatis實現(xiàn)數(shù)據(jù)的增刪改查實例(CRUD),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-05-05SpringBoot實現(xiàn)阿里云快遞物流查詢的示例代碼
本文將基于springboot實現(xiàn)快遞物流查詢,物流信息的獲取通過阿里云第三方實現(xiàn),具有一定的參考價值,感興趣的可以了解一下2021-10-10