Nacos集群數(shù)據(jù)同步方式
引言
在Nacos屬于集群時,當(dāng)服務(wù)器收到服務(wù)注冊請求后,發(fā)生了ClientEvent.ClientChangedEvent事件,就會觸發(fā)將注冊的服務(wù)信息同步給集群中的其他Nacos-server節(jié)點。
// DistroClientDataProcessor private void syncToAllServer(ClientEvent event) { Client client = event.getClient(); // Only ephemeral data sync by Distro, persist client should sync by raft. if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) { return; } if (event instanceof ClientEvent.ClientDisconnectEvent) { DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.DELETE); } else if (event instanceof ClientEvent.ClientChangedEvent) { DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.CHANGE); } }
同步時,會涉及到一個負責(zé)節(jié)點和非負責(zé)節(jié)點
負責(zé)節(jié)點(發(fā)起同步)
也就是收到客戶端事件ClientChangedEvent后負責(zé)同步信息給其他非負責(zé)節(jié)點, 所以這里只能有負責(zé)節(jié)點來進行同步,非負責(zé)節(jié)點只能接收同步事件
// DistroClientDataProcessor // Only ephemeral data sync by Distro, persist client should sync by raft. if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) { return; }
DistroProtocol
Distro是阿里巴巴的私有協(xié)議,distro協(xié)議是為了注冊中心而創(chuàng)造出的協(xié)議;
DistroProtocol會循環(huán)所有其他nacos節(jié)點,提交一個異步任務(wù),這個異步任務(wù)會延遲1s,其實這里我們就可以看到這里涉及到客戶端的斷開和客戶端的新增和修改,對于Delete操作,由DistroSyncDeleteTask處理,對于Change操作,由DistroSyncChangeTask處理,這里我們從DistroSyncChangeTask來看
// DistroProtocol public void sync(DistroKey distroKey, DataOperation action, long delay) { for (Member each : memberManager.allMembersWithoutSelf()) { syncToTarget(distroKey, action, each.getAddress(), delay); } }
在調(diào)用syncToTarget后,會觸發(fā)任務(wù)DistroDelayTaskProcessor處理任務(wù),這是Distro協(xié)議的一個默認延遲任務(wù)處理器,可以看到。 對于刪除類型的任務(wù),觸發(fā)任務(wù)DistroSyncDeleteTask , 對于刪除的任務(wù):DistroSyncChangeTask
public class DistroDelayTaskProcessor implements NacosTaskProcessor { @Override public boolean process(NacosTask task) { DistroDelayTask distroDelayTask = (DistroDelayTask) task; DistroKey distroKey = distroDelayTask.getDistroKey(); switch (distroDelayTask.getAction()) { case DELETE: DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder); distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask); return true; case CHANGE: case ADD: DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder); distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask); return true; default: return false; } } }
DistroSyncChangeTask
public class DistroSyncChangeTask extends AbstractDistroExecuteTask { ... // 無回調(diào) @Override protected boolean doExecute() { String type = getDistroKey().getResourceType(); DistroData distroData = getDistroData(type); if (null == distroData) { Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString()); return true; } return getDistroComponentHolder().findTransportAgent(type) .syncData(distroData, getDistroKey().getTargetServer()); } // 有回調(diào) @Override protected void doExecuteWithCallback(DistroCallback callback) { String type = getDistroKey().getResourceType(); DistroData distroData = getDistroData(type); if (null == distroData) { Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString()); return; } //將得到的數(shù)據(jù)同步給其他服務(wù)節(jié)點 getDistroComponentHolder().findTransportAgent(type) .syncData(distroData, getDistroKey().getTargetServer(), callback); } // 從DistroClientDataProcessor獲取DistroData private DistroData getDistroData(String type) { DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey()); if (null != result) { result.setType(OPERATION); } return result; } }
獲取同步數(shù)據(jù)getDistroData
這里獲取同步數(shù)據(jù)其實是從DistroClientDataProcessor 中獲取的,所以為Client的相關(guān)注冊服務(wù)信息
// DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor @Override public DistroData getDistroData(DistroKey distroKey) { Client client = clientManager.getClient(distroKey.getResourceKey()); if (null == client) { return null; } byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData()); return new DistroData(distroKey, data); }
可以看到generateSyncData 方法是關(guān)鍵獲取服務(wù)的方法,該方法提供了同步數(shù)據(jù),包含Client的注冊信息,包括客戶端注冊了哪些namespace,哪些group,哪些service,哪些instance。
// AbstractClient implements Client @Override public ClientSyncData generateSyncData() { List<String> namespaces = new LinkedList<>(); List<String> groupNames = new LinkedList<>(); List<String> serviceNames = new LinkedList<>(); List<InstancePublishInfo> instances = new LinkedList<>(); for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) { namespaces.add(entry.getKey().getNamespace()); groupNames.add(entry.getKey().getGroup()); serviceNames.add(entry.getKey().getName()); instances.add(entry.getValue()); } return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances); }
執(zhí)行同步數(shù)據(jù)syncData
這里的同步實際是由DistroClientTransportAgent來負責(zé)的,將數(shù)據(jù)分裝成DistroDataRequest 然后查詢到對于的服務(wù)節(jié)點Member然后調(diào)用asyncRequest異步方法執(zhí)行同步,后面的方法我就不跟了, 這時我們主要關(guān)注非負責(zé)節(jié)點收到同步請求后如何處理。
// DistroClientTransportAgent @Override public void syncData(DistroData data, String targetServer, DistroCallback callback) { if (isNoExistTarget(targetServer)) { callback.onSuccess(); return; } DistroDataRequest request = new DistroDataRequest(data, data.getType()); Member member = memberManager.find(targetServer); try { clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member)); } catch (NacosException nacosException) { callback.onFailed(nacosException); } }
非負責(zé)節(jié)點(接收請求)
當(dāng)負責(zé)節(jié)點將數(shù)據(jù)發(fā)送給非負責(zé)節(jié)點以后,將要處理發(fā)送過來的Client數(shù)據(jù)。通過DistroController收到數(shù)據(jù)后, 然后最終會DistroClientDataProcessor.processData方法來進行處理
// DistroController.java @PutMapping("/datum") public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception { ... DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), entry.getValue()); distroProtocol.onReceive(distroHttpData); ... }
// DistroClientDataProcessor.java @Override public boolean processData(DistroData distroData) { switch (distroData.getType()) { case ADD: case CHANGE: ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class) .deserialize(distroData.getContent(), ClientSyncData.class); handlerClientSyncData(clientSyncData); return true; case DELETE: String deleteClientId = distroData.getDistroKey().getResourceKey(); Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId); clientManager.clientDisconnected(deleteClientId); return true; default: return false; } }
可以看出,這里分別對ADD/CHANGE和DELETE進行了處理,這里我主要關(guān)注ADD/CHANGE,所以主要關(guān)注handlerClientSyncData方法。
private void handlerClientSyncData(ClientSyncData clientSyncData) { Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId()); // 同步客戶端連接,此時如果客戶端不存在,則會注冊一個非負責(zé)節(jié)點client,后面就會獲取到該客戶端操作 clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes()); // 獲取Client(此時注冊到的是ConnectionBasedClient) Client client = clientManager.getClient(clientSyncData.getClientId()); // 更新Client數(shù)據(jù) upgradeClient(client, clientSyncData); }
**注意:**這里要注意下此時的Client實現(xiàn)類ConnectionBasedClient,它的isNative屬性為false,這是非負責(zé)節(jié)點和負責(zé)節(jié)點的主要區(qū)別。
其實判斷當(dāng)前nacos節(jié)點是否為負責(zé)節(jié)點的依據(jù)就是這個**isNative屬性**,如果是客戶端直接注冊在這個nacos節(jié)點上的ConnectionBasedClient,它的isNative屬性為true;如果是由Distro協(xié)議,同步到這個nacos節(jié)點上的ConnectionBasedClient,它的isNative屬性為false。
那其實我們都知道2.x的版本以后使用了長連接,所以**通過長連接建立在哪個節(jié)點上,哪個節(jié)點就是責(zé)任節(jié)點,客戶端也只會向這個責(zé)任節(jié)點發(fā)送請求**。
DistroClientDataProcessor的upgradeClient方法,更新Client里的注冊表信息,發(fā)布對應(yīng)事件
private void upgradeClient(Client client, ClientSyncData clientSyncData) { List<String> namespaces = clientSyncData.getNamespaces(); List<String> groupNames = clientSyncData.getGroupNames(); List<String> serviceNames = clientSyncData.getServiceNames(); List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos(); Set<Service> syncedService = new HashSet<>(); for (int i = 0; i < namespaces.size(); i++) { Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i)); Service singleton = ServiceManager.getInstance().getSingleton(service); syncedService.add(singleton); InstancePublishInfo instancePublishInfo = instances.get(i); if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) { client.addServiceInstance(singleton, instancePublishInfo); NotifyCenter.publishEvent( new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId())); } } for (Service each : client.getAllPublishedService()) { if (!syncedService.contains(each)) { client.removeServiceInstance(each); NotifyCenter.publishEvent( new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId())); } } }
Distro協(xié)議負責(zé)集群數(shù)據(jù)統(tǒng)一
Distro為了確保集群間數(shù)據(jù)一致,不僅僅依賴于數(shù)據(jù)發(fā)生改變時的實時同步,后臺有定時任務(wù)做數(shù)據(jù)同步。
在1.x版本中,責(zé)任節(jié)點每5s同步所有Service的Instance列表的摘要(md5)給非責(zé)任節(jié)點,非責(zé)任節(jié)點用對端傳來的服務(wù)md5比對本地服務(wù)的md5,如果發(fā)生改變,需要反查責(zé)任節(jié)點。
在2.x版本中,對這個流程做了改造,責(zé)任節(jié)點會發(fā)送Client全量數(shù)據(jù),非責(zé)任節(jié)點定時檢測同步過來的Client是否過期,減少1.x版本中的反查。
責(zé)任節(jié)點每5s向其他節(jié)點發(fā)送DataOperation=VERIFY類型的DistroData,來維持非責(zé)任節(jié)點的Client數(shù)據(jù)不過期。
// DistroVerifyTimedTask.java @Override public void run() { // 所有其他節(jié)點 List<Member> targetServer = serverMemberManager.allMembersWithoutSelf(); for (String each : distroComponentHolder.getDataStorageTypes()) { // 遍歷這些節(jié)點發(fā)送Client.isNative=true的DistroData,type = VERIFY verifyForDataStorage(each, targetServer); } }
非責(zé)任節(jié)點每5s掃描isNative=false的client,如果client 30s內(nèi)沒有被VERIFY的DistroData更新過續(xù)租時間,會刪除這個同步過來的Client數(shù)據(jù)。
//ConnectionBasedClientManager->ExpiredClientCleaner private static class ExpiredClientCleaner implements Runnable { @Override public void run() { long currentTime = System.currentTimeMillis(); for (String each : clientManager.allClientId()) { ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each); if (null != client && client.isExpire(currentTime)) { clientManager.clientDisconnected(each); } } } } // ConnectionBasedClient.java @Override public boolean isExpire(long currentTime) { // 判斷30s內(nèi)沒有續(xù)租 認為過期 return !isNative() && currentTime - getLastRenewTime() > ClientConfig.getInstance().getClientExpiredTime(); }
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot2.1 RESTful API項目腳手架(種子)項目
這篇文章主要介紹了SpringBoot2.1 RESTful API項目腳手架(種子)項目,用于搭建RESTful API工程的腳手架,只需三分鐘你就可以開始編寫業(yè)務(wù)代碼,不再煩惱于構(gòu)建項目與風(fēng)格統(tǒng)一,感興趣的小伙伴們可以參考一下2018-12-12EditPlus運行java時從鍵盤輸入數(shù)據(jù)的操作方法
這篇文章主要介紹了EditPlus運行java時從鍵盤輸入數(shù)據(jù)的操作方法,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-03-03關(guān)于ObjectUtils.isEmpty()?和?null?的區(qū)別
這篇文章主要介紹了關(guān)于ObjectUtils.isEmpty()?和?null?的區(qū)別,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02使用Logback設(shè)置property參數(shù)方式
這篇文章主要介紹了使用Logback設(shè)置property參數(shù)方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-03-03詳解Java實現(xiàn)多種方式的http數(shù)據(jù)抓取
本篇文章主要介紹了Java實現(xiàn)多種方式的http數(shù)據(jù)抓取,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧。2016-12-12