亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

RocketMQ中的NameServer詳細(xì)解析

 更新時(shí)間:2024年01月03日 10:39:22   作者:潛水路人甲  
這篇文章主要介紹了RocketMQ中的NameServer詳細(xì)解析,NameServer是一個(gè)非常簡單的Topic路由注冊中心,支持Broker的動(dòng)態(tài)注冊與發(fā)現(xiàn),因此不能保證NameServer的一致性,需要的朋友可以參考下

前言

NameServer是一個(gè)非常簡單的Topic路由注冊中心,支持Broker的動(dòng)態(tài)注冊與發(fā)現(xiàn)。

Producer和Conumser通過NameServer可以知道整個(gè)Broker集群的路由信息,從而進(jìn)行消息的投遞和消費(fèi)。

NameServer各實(shí)例間相互不進(jìn)行信息通訊,因此不能保證NameServer的一致性(Consistency),可以保證可用性(Availability)。

即選擇了CAP中的AP。NameServer只能保證最終一致性,關(guān)于怎么保證最終一致性后文再講。

現(xiàn)在先從NameServer的啟動(dòng)開始。

NameServer為namesrv模塊

NamesrvStartup

public static NamesrvController main0(String[] args) {
//構(gòu)造NamesrvController 
            NamesrvController controller = createNamesrvController(args);
            start(controller);
            return controller;
}
    public static NamesrvController start(final NamesrvController controller) throws Exception {
//初始化
        boolean initResult = controller.initialize();
//啟動(dòng)
        controller.start();
        return controller;
    }
 

NamesrvStartup作為NameServer的啟動(dòng)類,主要做了三件事:

  • 構(gòu)造NamesrvController(NameServer控制器)
  • 加載初始化,由NamesrvController負(fù)責(zé)
  • 啟動(dòng)remotingServer,開啟Netty服務(wù)

NamesrvController

public class NamesrvController {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
 
    //nameSrv的配置
    private final NamesrvConfig namesrvConfig;
    //netty的配置
    private final NettyServerConfig nettyServerConfig;
 
    //執(zhí)行單線程的任務(wù)調(diào)度,自定義編程名稱
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));
 
    //kv的配置管理
    private final KVConfigManager kvConfigManager;
 
    //路由管理器,有broker的ip和隊(duì)列信息,producer發(fā)送的queue信息,consumer的pull的queue信息
    private final RouteInfoManager routeInfoManager;
 
    //namesrv的netty的服務(wù)端實(shí)現(xiàn)
    private RemotingServer remotingServer;
 
    //處理接受到請求事件的回調(diào)監(jiān)聽服務(wù),主要處理netty的事件
    private BrokerHousekeepingService brokerHousekeepingService;
 
    private ExecutorService remotingExecutor;
 
    private Configuration configuration;
    private FileWatchService fileWatchService;
 
    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;
        //構(gòu)造kv配置的管理
        this.kvConfigManager = new KVConfigManager(this);
        //構(gòu)造路由信息管理
        this.routeInfoManager = new RouteInfoManager();
        //構(gòu)造網(wǎng)絡(luò)連接事件管理
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        //配置
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }

NamesrvController作為一個(gè)控制器主要負(fù)責(zé)NettyServer的創(chuàng)建,注冊requestProcessor,啟動(dòng)NettyServer及各種task

    public boolean initialize() {
 
        //加載kv配置
        this.kvConfigManager.load();
        //初始化NettyServer
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        //執(zhí)行器,用于接受請求并進(jìn)行處理
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
        //注冊Netty的處理器,即remotingExecutor
        this.registerProcessor();
        //進(jìn)行掃描未活躍的Broker的任務(wù)
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);
        //...省略其他代碼
        return true;
    }
 
   
    private void registerProcessor() {
        if (namesrvConfig.isClusterTest()) {
            this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                this.remotingExecutor);
        } else {
            //注冊request處理器
            this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
        }
    }  
  
 
    public void start() throws Exception {
        //啟動(dòng)netty server
        this.remotingServer.start();
 
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }

NettyRequestProcessor

NettyRequestProcessor為請求處理器,上一步注冊的處理器,一般使用默認(rèn)的處理器DefaultRequestProcessor,processRequest方法處理請求。

@Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        //...
        switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            case RequestCode.QUERY_DATA_VERSION:
                return queryBrokerTopicConfig(ctx, request);
            case RequestCode.REGISTER_BROKER:
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                    return this.registerBroker(ctx, request);
                }
            //...省略其他代碼
        }
        return null;
    }

到此這篇關(guān)于RocketMQ中的NameServer詳細(xì)解析的文章就介紹到這了,更多相關(guān)RocketMQ中的NameServer內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java多線程批量處理百萬級的數(shù)據(jù)方法示例

    java多線程批量處理百萬級的數(shù)據(jù)方法示例

    這篇文章主要介紹了java多線程批量處理百萬級的數(shù)據(jù)的相關(guān)資料,文中通過代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用java多線程具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2025-02-02
  • 詳解java中的byte類型

    詳解java中的byte類型

    Java也提供了一個(gè)byte數(shù)據(jù)類型,并且是基本類型。java byte是做為最小的數(shù)字來處理的,因此它的值域被定義為-128~127,也就是signed byte。下面這篇文章主要給大家介紹了關(guān)于java中byte類型的相關(guān)資料,需要的朋友可以參考下。
    2017-02-02
  • Java8的Lambda和排序

    Java8的Lambda和排序

    這篇文章主要介紹了Java8的Lambda和排序,對數(shù)組和集合進(jìn)行排序是Java 8 lambda令人驚奇的一個(gè)應(yīng)用,我們可以實(shí)現(xiàn)一個(gè)Comparators來實(shí)現(xiàn)各種排序,下面文章將有案例詳細(xì)說明,想要了解得小伙伴可以參考一下
    2021-11-11
  • 淺談java中Math.random()與java.util.random()的區(qū)別

    淺談java中Math.random()與java.util.random()的區(qū)別

    下面小編就為大家?guī)硪黄獪\談java中Math.random()與java.util.random()的區(qū)別。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2016-09-09
  • POI XSSFSheet shiftRows bug問題解決

    POI XSSFSheet shiftRows bug問題解決

    這篇文章主要介紹了POI XSSFSheet shiftRows bug問題解決,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-07-07
  • 通過代理類實(shí)現(xiàn)java連接數(shù)據(jù)庫(使用dao層操作數(shù)據(jù))實(shí)例分享

    通過代理類實(shí)現(xiàn)java連接數(shù)據(jù)庫(使用dao層操作數(shù)據(jù))實(shí)例分享

    java通過代理類實(shí)現(xiàn)數(shù)據(jù)庫DAO操作代碼分享,大家參考使用吧
    2013-12-12
  • SpringBoot緩存方法返回值的方法詳解

    SpringBoot緩存方法返回值的方法詳解

    如何緩存方法的返回值?應(yīng)該會(huì)有很多的辦法,這篇文章主要為大家介紹兩個(gè)比較常見并且比較容易實(shí)現(xiàn)的辦法:自定義注解和SpringCache,希望對大家有所幫助
    2023-10-10
  • spring5 SAXParseException:cvc-elt.1: 找不到元素“beans 的聲明詳解

    spring5 SAXParseException:cvc-elt.1: 找不到元素“beans 的聲明詳解

    這篇文章主要給大家介紹了關(guān)于spring5 SAXParseException:cvc-elt.1: 找不到元素“beans 聲明的相關(guān)資料,需要的朋友可以參考下
    2020-08-08
  • Spring Boot mybatis-config 和 log4j 輸出sql 日志的方式

    Spring Boot mybatis-config 和 log4j 輸出sql 日志的方式

    這篇文章主要介紹了Spring Boot mybatis-config 和 log4j 輸出sql 日志的方式,本文通過實(shí)例圖文相結(jié)合給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2021-07-07
  • RabbitMQ的Direct Exchange模式實(shí)現(xiàn)的消息發(fā)布案例(示例代碼)

    RabbitMQ的Direct Exchange模式實(shí)現(xiàn)的消息發(fā)布案例(示例代碼)

    本文介紹了RabbitMQ的DirectExchange模式下的消息發(fā)布和消費(fèi)的實(shí)現(xiàn),詳細(xì)說明了如何在DirectExchange模式中進(jìn)行消息的發(fā)送和接收,以及消息處理的基本方法,感興趣的朋友跟隨小編一起看看吧
    2024-09-09

最新評論