Nacos配置中心集群原理及源碼分析
Nacos作為配置中心,必然需要保證服務(wù)節(jié)點的高可用性,那么Nacos是如何實現(xiàn)集群的呢?
下面這個圖,表示Nacos集群的部署圖。
Nacos集群工作原理
Nacos作為配置中心的集群結(jié)構(gòu)中,是一種無中心化節(jié)點的設(shè)計,由于沒有主從節(jié)點,也沒有選舉機制,所以為了能夠?qū)崿F(xiàn)熱備,就需要增加虛擬IP(VIP)。
Nacos的數(shù)據(jù)存儲分為兩部分
- Mysql數(shù)據(jù)庫存儲,所有Nacos節(jié)點共享同一份數(shù)據(jù),數(shù)據(jù)的副本機制由Mysql本身的主從方案來解決,從而保證數(shù)據(jù)的可靠性。
- 每個節(jié)點的本地磁盤,會保存一份全量數(shù)據(jù),具體路徑:/data/program/nacos-1/data/config-data/${GROUP}.
在Nacos的設(shè)計中,Mysql是一個中心數(shù)據(jù)倉庫,且認為在Mysql中的數(shù)據(jù)是絕對正確的。 除此之外,Nacos在啟動時會把Mysql中的數(shù)據(jù)寫一份到本地磁盤。
這么設(shè)計的好處是可以提高性能,當客戶端需要請求某個配置項時,服務(wù)端會想Ian從磁盤中讀取對應(yīng)文件返回,而磁盤的讀取效率要比數(shù)據(jù)庫效率高。
當配置發(fā)生變更時:
- Nacos會把變更的配置保存到數(shù)據(jù)庫,然后再寫入本地文件。
- 接著發(fā)送一個HTTP請求,給到集群中的其他節(jié)點,其他節(jié)點收到事件后,從Mysql中dump剛剛寫入的數(shù)據(jù)到本地文件中。
另外,NacosServer啟動后,會同步啟動一個定時任務(wù),每隔6小時,會dump一次全量數(shù)據(jù)到本地文件
配置變更同步入口
當配置發(fā)生修改、刪除、新增操作時,通過發(fā)布一個notifyConfigChange
事件。
@PostMapping @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class) public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response, @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag, @RequestParam(value = "appName", required = false) String appName, @RequestParam(value = "src_user", required = false) String srcUser, @RequestParam(value = "config_tags", required = false) String configTags, @RequestParam(value = "desc", required = false) String desc, @RequestParam(value = "use", required = false) String use, @RequestParam(value = "effect", required = false) String effect, @RequestParam(value = "type", required = false) String type, @RequestParam(value = "schema", required = false) String schema) throws NacosException { //省略.. if (StringUtils.isBlank(betaIps)) { if (StringUtils.isBlank(tag)) { persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true); ConfigChangePublisher .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime())); } else { persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true); ConfigChangePublisher.notifyConfigChange( new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); } }//省略 return true; }
AsyncNotifyService
配置數(shù)據(jù)變更事件,專門有一個監(jiān)聽器AsyncNotifyService,它會處理數(shù)據(jù)變更后的同步事件。
@Autowired public AsyncNotifyService(ServerMemberManager memberManager) { this.memberManager = memberManager; // Register ConfigDataChangeEvent to NotifyCenter. NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize); // Register A Subscriber to subscribe ConfigDataChangeEvent. NotifyCenter.registerSubscriber(new Subscriber() { @Override public void onEvent(Event event) { // Generate ConfigDataChangeEvent concurrently if (event instanceof ConfigDataChangeEvent) { ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event; long dumpTs = evt.lastModifiedTs; String dataId = evt.dataId; String group = evt.group; String tenant = evt.tenant; String tag = evt.tag; Collection<Member> ipList = memberManager.allMembers(); //得到集群中的ip列表 // 構(gòu)建NotifySingleTask,并添加到隊列中。 Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>(); for (Member member : ipList) { //遍歷集群中的每個節(jié)點 queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), evt.isBeta)); } //異步執(zhí)行任務(wù) AsyncTask ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue)); } } @Override public Class<? extends Event> subscribeType() { return ConfigDataChangeEvent.class; } }); }
AsyncTask
@Override public void run() { executeAsyncInvoke(); } private void executeAsyncInvoke() { while (!queue.isEmpty()) {//遍歷隊列中的數(shù)據(jù),直到數(shù)據(jù)為空 NotifySingleTask task = queue.poll(); //獲取task String targetIp = task.getTargetIP(); //獲取目標ip if (memberManager.hasMember(targetIp)) { //如果集群中的ip列表包含目標ip // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify //判斷目標ip的健康狀態(tài) boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); // if (unHealthNeedDelay) { //如果目標服務(wù)是非健康,則繼續(xù)添加到隊列中,延后再執(zhí)行。 // target ip is unhealthy, then put it in the notification list ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target); // get delay time and set fail count to the task asyncTaskExecute(task); } else { //構(gòu)建header Header header = Header.newInstance(); header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified())); header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP()); if (task.isBeta) { header.addParam("isBeta", "true"); } AuthHeaderUtil.addIdentityToHeader(header); //通過restTemplate發(fā)起遠程調(diào)用,如果調(diào)用成功,則執(zhí)行AsyncNotifyCallBack的回調(diào)方法 restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task)); } } } }
目標節(jié)點接收請求
數(shù)據(jù)同步的請求地址為,task.url=http://192.168.8.16:8848/nacos/v1/cs/communication/dataChange?dataId=log.yaml&group=DEFAULT_GROUP
@GetMapping("/dataChange") public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId, @RequestParam("group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam(value = "tag", required = false) String tag) { dataId = dataId.trim(); group = group.trim(); String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED); long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified); String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP); String isBetaStr = request.getHeader("isBeta"); if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) { dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true); } else { // dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp); } return true; }
dumpService.dump用來實現(xiàn)配置的更新,代碼如下
當前任務(wù)會被添加到DumpTaskMgr中管理。
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp, boolean isBeta) { String groupKey = GroupKey2.getKey(dataId, group, tenant); String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag); dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta)); DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey); }
TaskManager.addTask, 先調(diào)用父類去完成任務(wù)添加。
@Override public void addTask(Object key, AbstractDelayTask newTask) { super.addTask(key, newTask); MetricsMonitor.getDumpTaskMonitor().set(tasks.size()); }
在這種場景設(shè)計中,一般都會采用生產(chǎn)者消費者模式來完成,因此這里不難猜測到,任務(wù)會被保存到一個隊列中,然后有另外一個線程來執(zhí)行。
NacosDelayTaskExecuteEngine
TaskManager的父類是NacosDelayTaskExecuteEngine,
這個類中有一個成員屬性protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;,專門來保存延期執(zhí)行的任務(wù)類型AbstractDelayTask.
在這個類的構(gòu)造方法中,初始化了一個延期執(zhí)行的任務(wù),其中具體的任務(wù)是ProcessRunnable.
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { super(logger); tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity); processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); processingExecutor .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); }
ProcessRunnable
private class ProcessRunnable implements Runnable { @Override public void run() { try { processTasks(); } catch (Throwable e) { getEngineLog().error(e.toString(), e); } } }
processTasks
protected void processTasks() { //獲取所有的任務(wù) Collection<Object> keys = getAllTaskKeys(); for (Object taskKey : keys) { AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue; } //獲取任務(wù)處理器,這里返回的是DumpProcessor NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " + task); continue; } try { // ReAdd task if process failed //執(zhí)行具體任務(wù) if (!processor.process(task)) { retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error : " + e.toString(), e); retryFailedTask(taskKey, task); } } }
DumpProcessor.process
讀取數(shù)據(jù)庫的最新數(shù)據(jù),然后更新本地緩存和磁盤
以上就是Nacos配置中心集群原理及源碼分析的詳細內(nèi)容,更多關(guān)于Nacos配置中心集群原理的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
IDEA 2020.1 搜索不到Chinese (Simplified) Language
小編在安裝中文插件時遇到IDEA 2020.1 搜索不到Chinese ​(Simplified)​ Language Pack EAP,無法安裝的問題,本文給大家分享我的解決方法,感興趣的朋友一起看看吧2020-04-04spring @Profiles和@PropertySource實現(xiàn)根據(jù)環(huán)境切換配置文件
這篇文章主要介紹了spring @Profiles和@PropertySource根據(jù)環(huán)境切換配置文件,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11解決Java?結(jié)構(gòu)化數(shù)據(jù)處理開源庫?SPL的問題
這篇文章主要介紹了Java?結(jié)構(gòu)化數(shù)據(jù)處理開源庫?SPL的問題,Scala提供了較豐富的結(jié)構(gòu)化數(shù)據(jù)計算函數(shù),但編譯型語言的特點,也使它不能成為理想的結(jié)構(gòu)化數(shù)據(jù)計算類庫,對此內(nèi)容感興趣的朋友一起看看吧2022-03-03log4j中l(wèi)ogger標簽中additivity屬性的用法說明
這篇文章主要介紹了log4j中l(wèi)ogger標簽中additivity屬性的用法說明,基于很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12