dubbo服務注冊到nacos的過程剖析
前言
前面聊到到了我們的dubbo服務從redis遷移到nacos注冊中心,遷移后發(fā)現(xiàn),會時不時的拋一個異常 ERROR com.alibaba.nacos.client.naming - [CLIENT-BEAT] failed to send beat:, 所以有了這個剖析過程,當然最后查明異常是我們的SLB網絡映射問題,和nacos沒有關系。
- dubbo版本:2.7.4.1
- nacos client版本:1.0.0
- nacos server版本:1.1.3
簡述過程
- dubbo側:dubbo通過nacos注冊中心實現(xiàn),注冊服務到nacos,同時添加心跳任務,心跳任務每隔5s發(fā)送一次服務健康心跳。同時每隔1s查詢nacos服務列表是否有更新,如果有更新觸發(fā)服務實例更新通知,更新dubbo本地服務列表
- nacos側:nacos接收到心跳后,如果此時服務實例不存在,則新建一個服務實例,如果此時服務實例不健康,則設置為健康狀態(tài),并主動推送狀態(tài)到客戶端。nacos內部有一個檢查服務狀態(tài)的任務,如果15s沒有健康心跳上報,則設置服務實例不健康,如果30s沒有健康心跳上報,則下線這個服務實例,并推送狀態(tài)到客戶端。
源碼剖析具體實現(xiàn)
在dubbo的registry包下,針對服務注冊行為定義了四個接口,所有的服務注冊(zookeeper、nacos、redis、etcd等)支持都是這些接口的實現(xiàn)
- NotifyListener:服務變更通知監(jiān)聽的接口定義,在實現(xiàn)注冊中心時不需關心實現(xiàn),對接具體監(jiān)聽器往下傳遞這個實例就好
- RegistryService:服務注冊、取消注冊、定義、取消訂閱、服務查找的接口定義,是最核心的一個接口,包含了注冊中心實現(xiàn)的核心功能
- Registry:對RegistryService、Node的包裝,多了檢測服務是否可用,服務銷毀下線的方法,一般直接實現(xiàn)Registry接口
- RegistryFactory:通過注冊中心URL獲取注冊中心實現(xiàn)的接口定義,dubbo的spi設計,針對每個具體實現(xiàn),映射了一個注冊中心協(xié)議頭,如nacos實現(xiàn)對應了nacos:// 新對接一個注冊中心,并不需要直接實現(xiàn)Registry接口,可直接繼承FailbackRegistry抽象類,實現(xiàn)相關的do方法即可。dubbo針對服務注冊的抽象和nacos服務注冊的抽象非常契合,大部分接口可以直接對接使用,只有服務訂閱監(jiān)聽器的定義不一樣,稍微包裝轉換下即可,所以實現(xiàn)起來就非常簡單了。
服務注冊
org.apache.dubbo.registry.nacos.NacosRegistry:152
@Override
public void doRegister(URL url) {
final String serviceName = getServiceName(url);
final Instance instance = createInstance(url);
execute(namingService -> namingService.registerInstance(serviceName, instance));
}dubbo中,所以的服務都被封裝成了URL,對應nacos中的服務實例Instance,所以服務注冊時,只需要簡單的將URL轉換成Instance就可以注冊到nacos中,下面看看namingService中的具體注冊行為。
com.alibaba.nacos.client.naming.NacosNamingService:283
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}如上代碼,除了注冊實例外,還判斷了instance實例是否是臨時實例,如果是臨時實例,則加入了beatReactor的心跳列表。這是因為,nacos將服務分成了兩類,一類是臨時性的服務, 像dubbo、spring cloud這種,需要通過心跳來?;睿绻奶鴽]有及時發(fā)送,服務端會自動下線這個instance。一類是永久性服務,如數(shù)據庫、緩存服務等, 客戶端不會也沒法發(fā)送心跳,這類服務就由服務端通過TCP端口檢測等方式反向探活。下面看看臨時實例的心跳是怎么發(fā)送的。
com.alibaba.nacos.client.naming.NacosNamingService:104
private int initClientBeatThreadCount(Properties properties) {
if (properties == null) {
return UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT;
}
return NumberUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_CLIENT_BEAT_THREAD_COUNT),
UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
}
//可通過配置dubbo.registries.nacos.parameters.namingClientBeatThreadCount = 10設置維護心跳的線程數(shù)先看一段獲取心跳beatReactor線程池線程數(shù)量的初始化代碼,傳入的Properties是配置dubbo注冊中心時的參數(shù)列表,如果配置了namingClientBeatThreadCount,則取配置的值, 默認維護心跳的線程池大小為:如果是單核的,就是一個線程,多核的就CPU核心數(shù)一半的線程。繼續(xù)心跳邏輯
com.alibaba.nacos.client.naming.beat.BeatReactor:78
class BeatProcessor implements Runnable {
@Override
public void run() {
try {
for (Map.Entry entry : dom2Beat.entrySet()) {
BeatInfo beatInfo = entry.getValue();
if (beatInfo.isScheduled()) {
continue;
}
beatInfo.setScheduled(true);
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", e);
} finally {
executorService.schedule(this, clientBeatInterval, TimeUnit.MILLISECONDS);
}
}
}
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
long result = serverProxy.sendBeat(beatInfo);
beatInfo.setScheduled(false);
if (result > 0) {
clientBeatInterval = result;
}
}
}dom2Beat是一個存放需要心跳上報的臨時實例的map容器,NacosNamingService.registerInstance中通過判斷臨時節(jié)點添加到心跳列表的邏輯, 最終添加到了這個map里。BeatReactor初始化后會觸發(fā)BeatProcessor線程的調用,BeatProcessor線程是一個不斷自我觸發(fā)調用的線程,前一次 心跳上報邏輯執(zhí)行完后,間隔5S觸發(fā)下一次心跳上報。間隔時間由變量clientBeatInterval控制,受nacos服務端返回的心跳結果值的影響 心跳間隔可能會改變,nacos服務端從instance的元數(shù)據中尋找key為preserved.heart.beat.interval的值返回,如果為空則返回5S。 這個功能在dubbo2.7.4.1的版本里還不成熟,只能通過注解元素指定,如@Reference(parameters = "preserved.heart.beat.interval,10000"), 后面如果能夠直接在注冊中心的url參數(shù)配置就算成熟了,所以這個功能暫時不推薦使用,可以作為實驗功能試試。
服務訂閱
org.apache.dubbo.registry.nacos.NacosRegistry:399
private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
throws NacosException {
if (!nacosListeners.containsKey(serviceName)) {
EventListener eventListener = event -> {
if (event instanceof NamingEvent) {
NamingEvent e = (NamingEvent) event;
notifySubscriber(url, listener, e.getInstances());
}
};
namingService.subscribe(serviceName, eventListener);
nacosListeners.put(serviceName, eventListener);
}
}nacos的服務監(jiān)聽是EventListener,所以dubbo的服務訂閱只需要將NotifyListener的處理包裝進onEvent中處理即可, 通過namingService.subscribe添加nacos的訂閱。最終EventListener對象會被添加到事件調度器的監(jiān)聽器列表中,見如下代碼:
com.alibaba.nacos.client.naming.core.EventDispatcher:
public class EventDispatcher {
private ExecutorService executor = null;
private BlockingQueuechangedServices = new LinkedBlockingQueue();
private ConcurrentMap observerMap = new ConcurrentHashMap();
public EventDispatcher() {
executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");
thread.setDaemon(true);
return thread;
}
});
executor.execute(new Notifier());
}
public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {
NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map");
Listobservers = Collections.synchronizedList(new ArrayList());
observers.add(listener);
observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
if (observers != null) {
observers.add(listener);
}
serviceChanged(serviceInfo);
}
public void removeListener(String serviceName, String clusters, EventListener listener) {
NAMING_LOGGER.info("[LISTENER] removing " + serviceName + " with " + clusters + " from listener map");
Listobservers = observerMap.get(ServiceInfo.getKey(serviceName, clusters));
if (observers != null) {
Iteratoriter = observers.iterator();
while (iter.hasNext()) {
EventListener oldListener = iter.next();
if (oldListener.equals(listener)) {
iter.remove();
}
}
if (observers.isEmpty()) {
observerMap.remove(ServiceInfo.getKey(serviceName, clusters));
}
}
}
public ListgetSubscribeServices() {
ListserviceInfos = new ArrayList();
for (String key : observerMap.keySet()) {
serviceInfos.add(ServiceInfo.fromKey(key));
}
return serviceInfos;
}
public void serviceChanged(ServiceInfo serviceInfo) {
if (serviceInfo == null) {
return;
}
changedServices.add(serviceInfo);
}
private class Notifier implements Runnable {
@Override
public void run() {
while (true) {
ServiceInfo serviceInfo = null;
try {
serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
} catch (Exception ignore) {
}
if (serviceInfo == null) {
continue;
}
try {
Listlisteners = observerMap.get(serviceInfo.getKey());
if (!CollectionUtils.isEmpty(listeners)) {
for (EventListener listener : listeners) {
Listhosts = Collections.unmodifiableList(serviceInfo.getHosts());
listener.onEvent(new NamingEvent(serviceInfo.getName(), hosts));
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] notify error for service: "
+ serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);
}
}
}
}
public void setExecutor(ExecutorService executor) {
ExecutorService oldExecutor = this.executor;
this.executor = executor;
oldExecutor.shutdown();
}
}EventDispatcher中維護了一個監(jiān)聽器列表observerMap,同時維護了一個事件變更的阻塞隊列changedServices,監(jiān)聽調度器初始化后,會觸發(fā)一個線程消費阻塞隊列的 數(shù)據,當注冊服務發(fā)生變化時,將變更數(shù)據入隊,就能喚醒線程更新dubbo內存中的服務列表了。上面已經聊到,nacos client會以1s的頻次拉取注冊的實例,當拉取到的實例和本地內存的 有出入時,就會觸發(fā)入隊操作,如:
com.alibaba.nacos.client.naming.core.HostReactor:296
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;
private String clusters;
private String serviceName;
public UpdateTask(String serviceName, String clusters) {
this.serviceName = serviceName;
this.clusters = clusters;
}
@Override
public void run() {
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
if (serviceObj == null) {
updateServiceNow(serviceName, clusters);
executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateServiceNow(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
lastRefTime = serviceObj.getLastRefTime();
} catch (Throwable e) {
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
}
}
}DEFAULT_DELAY值為1s,同時,nacos也會主動的推送數(shù)據變更事件,當遇到nacos主動推送時,serviceInfoMap中的serviceObj會被更新,那么下次 nacos client拉取的時間間隔會被設置成10S之后,具體的和本地列表比對的邏輯都在updateServiceNow方法內,這里就不展開講述了。
結語
dubbo注冊服務到nacos以及訂閱服務是一個比較復雜的過程,在剖析的過程中,帶著疑問去看源碼會有事半功倍的效果,比如博主在看源碼前, 首先是為了尋找nacos的心跳異常,然后對nacos如何實現(xiàn)事件監(jiān)聽比較好奇。然后層層剖析漸進明朗恍然大悟。當然在剖析dubbo注冊服務到nacos時,也需要了解 nacos服務端的處理邏輯,nacos服務端非常核心的兩個類ClientBeatCheckTask、ClientBeatProcessor,包含了心跳處理、健康檢測和事件推送的邏輯, 有興趣可以看看
以上就是dubbo服務注冊到nacos的過程剖析的詳細內容,更多關于dubbo服務注冊到nacos的資料請關注腳本之家其它相關文章!
相關文章
spring-@Autowired注入與構造函數(shù)注入使用方式
這篇文章主要介紹了spring-@Autowired注入與構造函數(shù)注入使用方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12
解決IDEA?JDK9沒有module-info.java的問題
這篇文章主要介紹了解決IDEA?JDK9沒有module-info.java的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01
解決SpringBoot的@DeleteMapping注解的方法不被調用問題
這篇文章主要介紹了解決SpringBoot的@DeleteMapping注解的方法不被調用問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-01-01
Mybatis-Plus insertBatch執(zhí)行緩慢的原因查詢
這篇文章主要介紹了Mybatis-Plus insertBatch執(zhí)行緩慢的原因查詢,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-11-11

