亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

RocketMQ源碼解析broker?啟動流程

 更新時間:2023年03月23日 11:09:25   作者:hsfxuebao  
這篇文章主要為大家介紹了RocketMQ源碼解析broker啟動流程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

1. 啟動入口

本系列RocketMQ4.8注釋github地址,希望對大家有所幫助,要是覺得可以的話麻煩給點一下Star哈

前面我們已經(jīng)分析完了NameServerproducer,從本文開始,我們將分析Broker。

broker的啟動類為org.apache.rocketmq.broker.BrokerStartup,代碼如下:

public class BrokerStartup {
    ...
    public static void main(String[] args) {
        start(createBrokerController(args));
    }
    ...
}

main()方法中,僅有一行代碼,這行代碼包含了兩個操作:

  • createBrokerController(...):創(chuàng)建BrokerController
  • start(...):啟動Broker

接下來我們就來分析這兩個操作。

2. 創(chuàng)建BrokerController

創(chuàng)建BrokerController的方法為BrokerStartup#createBrokerController,代碼如下:

/**
 * 創(chuàng)建 broker 的配置參數(shù)
 */
public static BrokerController createBrokerController(String[] args) {
    ...
    try {
        //解析命令行參數(shù)
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
            new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
        }
        // 處理配置
        final BrokerConfig brokerConfig = new BrokerConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        final NettyClientConfig nettyClientConfig = new NettyClientConfig();
        // tls安全相關(guān)
        nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
            String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
        // 配置端口
        nettyServerConfig.setListenPort(10911);
        // 消息存儲的配置
        final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
        ...
        // 將命令行中的配置設(shè)置到brokerConfig對象中
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
        // 檢查環(huán)境變量:ROCKETMQ_HOME
        if (null == brokerConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match 
                the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }
        //省略一些配置
        ...
        // 創(chuàng)建 brokerController
        final BrokerController controller = new BrokerController(
            brokerConfig,
            nettyServerConfig,
            nettyClientConfig,
            messageStoreConfig);
        controller.getConfiguration().registerConfig(properties);
        // 初始化
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
        // 關(guān)閉鉤子,在關(guān)閉前處理一些操作
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            private volatile boolean hasShutdown = false;
            private AtomicInteger shutdownTimes = new AtomicInteger(0);
            @Override
            public void run() {
                synchronized (this) {
                    if (!this.hasShutdown) {
                        ...
                        // 這里會發(fā)一條注銷消息給nameServer
                        controller.shutdown();
                        ...
                    }
                }
            }
        }, "ShutdownHook"));
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
    return null;
}

這個方法的代碼有點長,但功能并不多,總的來說就三個功能:

  • 處理配置:主要是處理nettyServerConfignettyClientConfig的配置,這塊就是一些配置解析的操作,處理方式與NameServer很類似,這里就不多說了。
  • 創(chuàng)建及初始化controller:調(diào)用方法controller.initialize(),這塊內(nèi)容我們后面分析。
  • 注冊關(guān)閉鉤子:調(diào)用Runtime.getRuntime().addShutdownHook(...),可以在jvm進程關(guān)閉前進行一些操作。

2.1 controller實例化

BrokerController的創(chuàng)建及初始化是在BrokerStartup#createBrokerController方法中進行,我們先來看看它的構(gòu)造方法:

public BrokerController(
    final BrokerConfig brokerConfig,
    final NettyServerConfig nettyServerConfig,
    final NettyClientConfig nettyClientConfig,
    final MessageStoreConfig messageStoreConfig
) {
    // 4個核心配置信息
    this.brokerConfig = brokerConfig;
    this.nettyServerConfig = nettyServerConfig;
    this.nettyClientConfig = nettyClientConfig;
    this.messageStoreConfig = messageStoreConfig;
    // 管理consumer消費消息的offset
    this.consumerOffsetManager = new ConsumerOffsetManager(this);
    // 管理topic配置
    this.topicConfigManager = new TopicConfigManager(this);
    // 處理 consumer 拉消息請求的
    this.pullMessageProcessor = new PullMessageProcessor(this);
    this.pullRequestHoldService = new PullRequestHoldService(this);
    // 消息送達的監(jiān)聽器
    this.messageArrivingListener 
        = new NotifyMessageArrivingListener(this.pullRequestHoldService);
    ...
    // 往外發(fā)消息的組件
    this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
    ...
}

BrokerController的構(gòu)造方法很長,基本都是一些賦值操作,代碼中已列出關(guān)鍵項,這些包括:

  • 核心配置賦值:主要是brokerConfig/nettyServerConfig/nettyClientConfig/messageStoreConfig四個配置
  • ConsumerOffsetManager:管理consumer消費消息位置的偏移量,偏移量表示消費者組消費該topic消息的位置,后面再消費時,就從該位置后消費,避免重復消費消息,也避免了漏消費消息。
  • topicConfigManagertopic配置管理器,就是用來管理topic配置的,如topic名稱,topic隊列數(shù)量
  • pullMessageProcessor:消息處理器,用來處理消費者拉消息
  • messageArrivingListener:消息送達的監(jiān)聽器,當生產(chǎn)者的消息送達時,由該監(jiān)聽器監(jiān)聽
  • brokerOuterAPI:往外發(fā)消息的組件,如向NameServer發(fā)送注冊/注銷消息

以上這些組件的用處,這里先混個臉熟,我們后面再分析。

2.2 初始化controller

我們再來看看初始化操作,方法為BrokerController#initialize

public boolean initialize() throws CloneNotSupportedException {
    // 加載配置文件中的配置
    boolean result = this.topicConfigManager.load();
    result = result && this.consumerOffsetManager.load();
    result = result && this.subscriptionGroupManager.load();
    result = result && this.consumerFilterManager.load();
    if (result) {
        try {
            // 消息存儲管理組件,管理磁盤上的消息
            this.messageStore =
                new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, 
                    this.messageArrivingListener, this.brokerConfig);
            // 啟用了DLeger,就創(chuàng)建DLeger相關(guān)組件
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
                ...
            }
            // broker統(tǒng)計組件
            this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
            //load plugin
            MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, 
                brokerStatsManager, messageArrivingListener, brokerConfig);
            this.messageStore = MessageStoreFactory.build(context, this.messageStore);
            this.messageStore.getDispatcherList().addFirst(
                new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
        } catch (IOException e) {
            result = false;
            log.error("Failed to initialize", e);
        }
    }
    // 加載磁盤上的記錄,如commitLog寫入的位置、消費者主題/隊列的信息
    result = result && this.messageStore.load();
    if (result) {
        // 處理 nettyServer
        this.remotingServer = new NettyRemotingServer(
            this.nettyServerConfig, this.clientHousekeepingService);
        NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
        fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
        this.fastRemotingServer = new NettyRemotingServer(
            fastConfig, this.clientHousekeepingService);
        // 創(chuàng)建線程池start... 這里會創(chuàng)建多種類型的線程池
        ...
        // 處理consumer pull操作的線程池
        this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getPullMessageThreadPoolNums(),
            this.brokerConfig.getPullMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.pullThreadPoolQueue,
            new ThreadFactoryImpl("PullMessageThread_"));
        ...
        // 創(chuàng)建線程池end...
        // 注冊處理器
        this.registerProcessor();
        // 啟動定時任務(wù)start... 這里會啟動好多的定時任務(wù)
        ...
        // 定時將consumer消費到的offset進行持久化操作,即將數(shù)據(jù)保存到磁盤上
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.consumerOffsetManager.persist();
                } catch (Throwable e) {
                    log.error("schedule persist consumerOffset error.", e);
                }
            }
        }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
        ...
        // 啟動定時任務(wù)end...
        ...
        // 開啟 DLeger 的一些操作
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            ...
        }
        // 處理tls配置
        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            ...
        }
        // 初始化一些操作
        initialTransaction();
        initialAcl();
        initialRpcHooks();
    }
    return result;
}

這個還是很長,關(guān)鍵部分都做了注釋,該方法所做的工作如下:

  • 加載配置文件中的配置
  • 賦值與初始化操作
  • 創(chuàng)建線程池
  • 注冊處理器
  • 啟動定時任務(wù)

這里我們來看下注冊處理器的操作this.registerProcessor():

2.2.1 注冊處理器:BrokerController#registerProcessor

this.registerProcessor()實際調(diào)用的方法是BrokerController#registerProcessor,代碼如下:

public void registerProcessor() {
    /**
     * SendMessageProcessor
     */
    SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
    sendProcessor.registerSendMessageHook(sendMessageHookList);
    sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, 
        this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,  
        this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, 
        this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, 
        this.sendMessageExecutor);
    ...
    /**
     * PullMessageProcessor
     */
    this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, 
        this.pullMessageExecutor);
    this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
    /**
        * ReplyMessageProcessor
        */
    ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
    replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
    ...
}

這個方法里注冊了許許多多的處理器,這里僅列出了與消息相關(guān)的內(nèi)容,如發(fā)送消息、回復消息、拉取消息等,后面在處理producer/consumer的消息時,就會用到這些處理器,這里先不展開分析。

2.2.2 remotingServer注冊處理器:NettyRemotingServer#registerProcessor

我們來看下remotingServer注冊處理器的操作,方法為NettyRemotingServer#registerProcessor

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    ...
    @Override
    public void registerProcessor(int requestCode, NettyRequestProcessor processor, 
            ExecutorService executor) {
        ExecutorService executorThis = executor;
        if (null == executor) {
            executorThis = this.publicExecutor;
        }
        Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, 
                ExecutorService>(processor, executorThis);
        // 注冊到processorTable 中
        this.processorTable.put(requestCode, pair);
    }
    ...
}

最終,這些處理器注冊到了processorTable中,它是NettyRemotingAbstract的成員變量,定義如下:

HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>>

這是一個hashMap的結(jié)構(gòu),keycode,valuePair,該類中有兩個成員變量:NettyRequestProcessor、ExecutorService,codeNettyRequestProcessor的映射關(guān)系就是在hashMap里存儲的。

2.3 注冊關(guān)閉鉤子:Runtime.getRuntime().addShutdownHook(...)

接著我們來看看注冊關(guān)閉鉤子的操作:

// 關(guān)閉鉤子,在關(guān)閉前處理一些操作
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
    private volatile boolean hasShutdown = false;
    private AtomicInteger shutdownTimes = new AtomicInteger(0);
    @Override
    public void run() {
        synchronized (this) {
            if (!this.hasShutdown) {
                ...
                // 這里會發(fā)一條注銷消息給nameServer
                controller.shutdown();
                ...
            }
        }
    }
}, "ShutdownHook"));

跟進BrokerController#shutdown方法:

public void shutdown() {
    // 調(diào)用各組件的shutdown方法
    ...
    // 發(fā)送注銷消息到NameServer
    this.unregisterBrokerAll();
    ...
    // 持久化consumer的消費偏移量
    this.consumerOffsetManager.persist();
    // 又是調(diào)用各組件的shutdown方法
    ...

這個方法里會調(diào)用各組件的shutdown()方法、發(fā)送注銷消息給NameServer、持久化consumer的消費偏移量,這里我們主要看發(fā)送注銷消息的方法BrokerController#unregisterBrokerAll:

private void unregisterBrokerAll() {
    // 發(fā)送一條注銷消息給nameServer
    this.brokerOuterAPI.unregisterBrokerAll(
        this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId());
}

繼續(xù)進入BrokerOuterAPI#unregisterBrokerAll

public void unregisterBrokerAll(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId
) {
    // 獲取所有的 nameServer,遍歷發(fā)送注銷消息
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null) {
        for (String namesrvAddr : nameServerAddressList) {
            try {
                this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
                log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
            } catch (Exception e) {
                log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
            }
        }
    }
}

這個方法里,會獲取到所有的nameServer,然后逐個發(fā)送注銷消息,繼續(xù)進入BrokerOuterAPI#unregisterBroker方法:

public void unregisterBroker(
    final String namesrvAddr,
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId
) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, 
        InterruptedException, MQBrokerException {
    UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
    requestHeader.setBrokerAddr(brokerAddr);
    requestHeader.setBrokerId(brokerId);
    requestHeader.setBrokerName(brokerName);
    requestHeader.setClusterName(clusterName);
    // 發(fā)送的注銷消息:RequestCode.UNREGISTER_BROKER
    RemotingCommand request = RemotingCommand.createRequestCommand(
            c, requestHeader);
    RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return;
        }
        default:
            break;
    }
    throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
}

最終調(diào)用的是RemotingClient#invokeSync進行消息發(fā)送,請求codeRequestCode.UNREGISTER_BROKER,這就與NameServer接收broker的注銷消息對應(yīng)上了。

3. 啟動Broker:start(...)

我們再來看看Broker的啟動流程,處理方法為BrokerController#start

public void start() throws Exception {
    // 啟動各組件
    // 啟動消息存儲相關(guān)組件
    if (this.messageStore != null) {
        this.messageStore.start();
    }
    // 啟動 remotingServer,其實就是啟動一個netty服務(wù),用來接收producer傳來的消息
    if (this.remotingServer != null) {
        this.remotingServer.start();
    }
    ...
    // broker對外發(fā)放消息的組件,向nameServer上報存活消息時使用了它,也是一個netty服務(wù)
    if (this.brokerOuterAPI != null) {
        this.brokerOuterAPI.start();
    }
    ...
    // broker 核心的心跳注冊任務(wù)
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                BrokerController.this.registerBrokerAll(true, false, 
                    brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }
        // brokerConfig.getRegisterNameServerPeriod() 值為 1000 * 30,最終計算得到默認30秒執(zhí)行一次
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), 
            TimeUnit.MILLISECONDS);
    ...
}

這個方法主要就是啟動各組件了,這里列出了幾大重要組件的啟動:

  • messageStore:消息存儲組件,在這個組件里,會啟動消息存儲相關(guān)的線程,如消息的投遞操作、commitLog文件的flush操作、comsumeQueue文件的flush操作等
  • remotingServernetty服務(wù),用來接收請求消息,如producer發(fā)送過來的消息
  • brokerOuterAPI:也是一個netty服務(wù),用來對外發(fā)送消息,如向nameServer上報心跳消息
  • 啟動定時任務(wù):brokernameServer發(fā)送注冊消息

這里我們重點來看定時任務(wù)是如何發(fā)送心跳發(fā)送的。

處理注冊消息發(fā)送的時間間隔如下:

Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)

這行代碼看著長,但意思就一句話:時間間隔可以自行配置,但不能小于10s,不能大于60s,默認是30s.

處理消息注冊的方法為BrokerController#registerBrokerAll(...),代碼如下:

public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
        boolean oneway, boolean forceRegister) {
    TopicConfigSerializeWrapper topicConfigWrapper 
            = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    // 處理topic相關(guān)配置
    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
        ...
    }
    // 這里會判斷是否需要進行注冊
    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId(),
        this.brokerConfig.getRegisterBrokerTimeoutMills())) {
        // 進行注冊操作    
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
    }
}

這個方法就是用來處理注冊操作的,不過注冊前會先驗證下是否需要注冊,驗證是否需要注冊的方法為BrokerController#needRegister, 代碼如下:

private boolean needRegister(final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final int timeoutMills) {
    TopicConfigSerializeWrapper topicConfigWrapper 
        = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    // 判斷是否需要進行注冊
    List&lt;Boolean&gt; changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, 
        brokerId, topicConfigWrapper, timeoutMills);
    // 有一個發(fā)生了變化,就表示需要注冊了    
    boolean needRegister = false;
    for (Boolean changed : changeList) {
        if (changed) {
            needRegister = true;
            break;
        }
    }
    return needRegister;
}

這個方法調(diào)用了brokerOuterAPI.needRegister(...)來判斷broker是否發(fā)生了變化,只要一個NameServer上發(fā)生了變化,就說明需要進行注冊操作。

brokerOuterAPI.needRegister(...)是如何判斷broker是否發(fā)生了變化的呢?繼續(xù)跟進BrokerOuterAPI#needRegister

public List<Boolean> needRegister(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final int timeoutMills) {
    final List<Boolean> changedList = new CopyOnWriteArrayList<>();
    // 獲取所有的 nameServer
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
        // 遍歷所有的nameServer,逐一發(fā)送請求
        for (final String namesrvAddr : nameServerAddressList) {
            brokerOuterExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        QueryDataVersionRequestHeader requestHeader 
                            = new QueryDataVersionRequestHeader();
                        ...
                        // 向nameServer發(fā)送消息,命令是 RequestCode.QUERY_DATA_VERSION
                        RemotingCommand request = RemotingCommand
                            .createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
                        // 把當前的 DataVersion 發(fā)到 nameServer     
                        request.setBody(topicConfigWrapper.getDataVersion().encode());
                        // 發(fā)請求到nameServer
                        RemotingCommand response = remotingClient
                            .invokeSync(namesrvAddr, request, timeoutMills);
                        DataVersion nameServerDataVersion = null;
                        Boolean changed = false;
                        switch (response.getCode()) {
                            case ResponseCode.SUCCESS: {
                                QueryDataVersionResponseHeader queryDataVersionResponseHeader =
                                  (QueryDataVersionResponseHeader) response
                                  .decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
                                changed = queryDataVersionResponseHeader.getChanged();
                                byte[] body = response.getBody();
                                if (body != null) {
                                    // 拿到 DataVersion
                                    nameServerDataVersion = DataVersion.decode(body, D
                                        ataVersion.class);
                                    // 這里是判斷的關(guān)鍵
                                    if (!topicConfigWrapper.getDataVersion()
                                        .equals(nameServerDataVersion)) {
                                        changed = true;
                                    }
                                }
                                if (changed == null || changed) {
                                    changedList.add(Boolean.TRUE);
                                }
                            }
                            default:
                                break;
                        }
                        ...
                    } catch (Exception e) {
                        ...
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
        try {
            countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("query dataversion from nameserver countDownLatch await Exception", e);
        }
    }
    return changedList;
}

這個方法里,先是遍歷所有的nameServer,向每個nameServer都發(fā)送一條codeRequestCode.QUERY_DATA_VERSION的參數(shù),參數(shù)為當前brokerDataVersion,當nameServer收到消息后,就返回nameServer中保存的、與當前broker對應(yīng)的DataVersion,當兩者版本不相等時,就表明當前broker發(fā)生了變化,需要重新注冊。

DataVersion是個啥呢?它的部分代碼如下:

public class DataVersion extends RemotingSerializable {
    // 時間戳
    private long timestamp = System.currentTimeMillis();
    // 計數(shù)器,可以理解為最近的版本號
    private AtomicLong counter = new AtomicLong(0);
    public void nextVersion() {
        this.timestamp = System.currentTimeMillis();
        this.counter.incrementAndGet();
    }
    /**
     * equals 方法,當 timestamp 與 counter 都相等時,則兩者相等
     */
    @Override
    public boolean equals(final Object o) {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        final DataVersion that = (DataVersion) o;
        if (timestamp != that.timestamp) {
            return false;
        }
        if (counter != null && that.counter != null) {
            return counter.longValue() == that.counter.longValue();
        }
        return (null == counter) && (null == that.counter);
    }
    ...
} 

DataVersionequals()方法來看,只有當timestampcounter都相等時,兩個DataVersion對象才相等。那這兩個值會在哪里被修改呢?從DataVersion#nextVersion方法的調(diào)用情況來看,引起這兩個值的變化主要有兩種:

  • broker 上新創(chuàng)建了一個 topic
  • topic的發(fā)了的變化

在這兩種情況下,DataVersion#nextVersion方法被調(diào)用,從而引起DataVersion的改變。DataVersion改變了,就表明當前broker需要向nameServer注冊了。

讓我們再回到BrokerController#registerBrokerAll(...)方法:

public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
        boolean oneway, boolean forceRegister) {
    ...
    // 這里會判斷是否需要進行注冊
    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId(),
        this.brokerConfig.getRegisterBrokerTimeoutMills())) {
        // 進行注冊操作    
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
    }
}

處理注冊的方法為BrokerController#doRegisterBrokerAll,稍微看下它的流程:

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
        TopicConfigSerializeWrapper topicConfigWrapper) {
    // 注冊
    List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
        this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId(),
        this.getHAServerAddr(),
        // 這個對象里就包含了當前broker的版本信息
        topicConfigWrapper,
        this.filterServerManager.buildNewFilterServerList(),
        oneway,
        this.brokerConfig.getRegisterBrokerTimeoutMills(),
        this.brokerConfig.isCompressedRegister());
    ...
}

繼續(xù)跟下去,最終調(diào)用的是BrokerOuterAPI#registerBroker方法:

private RegisterBrokerResult registerBroker(
    final String namesrvAddr,
    final boolean oneway,
    final int timeoutMills,
    final RegisterBrokerRequestHeader requestHeader,
    final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, 
    RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
    // 構(gòu)建請求
    RemotingCommand request = RemotingCommand
        .createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
    request.setBody(body);
    // 處理發(fā)送操作:sendOneWay
    if (oneway) {
        try {
            // 注冊操作
            this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
        } catch (RemotingTooMuchRequestException e) {
            // Ignore
        }
        return null;
        ...
    }
    ....
}

所以,所謂的注冊操作,就是當nameServer發(fā)送一條codeRequestCode.REGISTER_BROKER的消息,消息里會帶上當前brokertopic信息、版本號等。

4.總結(jié)

本文主要分析了broker的啟動流程,總的來說,啟動流程分為3個:

  • 解析配置文件,這一步會解析各種配置,并將其賦值到對應(yīng)的對象中
  • BrokerController創(chuàng)建及初始化:創(chuàng)建了BrokerController對象,并進行初始化操作,所謂的初始化,就是加載配置文件中配置、創(chuàng)建線程池、注冊請求處理器、啟動定時任務(wù)等
  • BrokerController啟動:這一步是啟動broker的核心組件,如messageStore(消息存儲)、remotingServer(netty服務(wù),用來處理producerconsumer請求)、brokerOuterAPI(netty服務(wù),用來向nameServer上報當前broker信息)等。

在分析啟動過程中,重點分析了兩類消息的發(fā)送:

  • ShutdownHook中,broker會向nameServer發(fā)送注銷消息,這表明在broker關(guān)閉前,nameServer會清除當前broker的注冊信息

  • broker啟動后,會啟動一個定時任務(wù),定期判斷是否需要向nameServer注冊,判斷是否需要注冊時,會向nameServer發(fā)送codeQUERY_DATA_VERSION的消息,從nameServer得到當前broker的版本號,該版本號與本地版本號不一致,就表示需要向broker重新注冊了,即發(fā)送注冊消息。

參考文章

RocketMQ4.8注釋github地址

以上就是RocketMQ源碼解析broker 啟動流程的詳細內(nèi)容,更多關(guān)于RocketMQ broker啟動的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 解決idea創(chuàng)建版本時只有Java21和Java17選項

    解決idea創(chuàng)建版本時只有Java21和Java17選項

    你是否在使用IntelliJ?IDEA創(chuàng)建新項目時遇到了只有Java?21和Java?17的選項?別擔心,我們的指南將為你提供解決方案,通過簡單的步驟,你將能夠選擇你需要的任何Java版本,繼續(xù)閱讀,讓我們開始吧!
    2024-03-03
  • springboot中的pom文件?project報錯問題

    springboot中的pom文件?project報錯問題

    這篇文章主要介紹了springboot中的pom文件?project報錯問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • Java日期與時間類原理解析

    Java日期與時間類原理解析

    這篇文章主要介紹了Java日期與時間類原理解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-01-01
  • 使用java實現(xiàn)備份和恢復SQLServer表數(shù)據(jù)

    使用java實現(xiàn)備份和恢復SQLServer表數(shù)據(jù)

    這篇文章主要為大家詳細介紹了如何使用java實現(xiàn)備份和恢復SQLServer表數(shù)據(jù),文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學習一下
    2024-01-01
  • Java使用Arrays.asList報UnsupportedOperationException的解決

    Java使用Arrays.asList報UnsupportedOperationException的解決

    這篇文章主要介紹了Java使用Arrays.asList報UnsupportedOperationException的解決,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-04-04
  • Java Scala之模式匹配與隱式轉(zhuǎn)換

    Java Scala之模式匹配與隱式轉(zhuǎn)換

    在Java中我們有switch case default這三個組成的基礎(chǔ)語法,在Scala中我們是有match和case組成 default的作用由case代替,本文詳細介紹了Scala的模式匹配與隱式轉(zhuǎn)換,感興趣的可以參考本文
    2023-04-04
  • Java實現(xiàn)布隆過濾器的示例詳解

    Java實現(xiàn)布隆過濾器的示例詳解

    布隆過濾器(Bloom?Filter)是1970年由布隆提出來的,實際上是由一個很長的二進制數(shù)組+一系列hash算法映射函數(shù),用于判斷一個元素是否存在于集合中。本文主要介紹了Java實現(xiàn)布隆過濾器的示例代碼,希望對大家有所幫助
    2023-03-03
  • springmvc實現(xiàn)自定義類型轉(zhuǎn)換器示例

    springmvc實現(xiàn)自定義類型轉(zhuǎn)換器示例

    本篇文章主要介紹了springmvc實現(xiàn)自定義類型轉(zhuǎn)換器示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-02-02
  • spring Boot查詢數(shù)據(jù)分頁顯示的方法實例

    spring Boot查詢數(shù)據(jù)分頁顯示的方法實例

    這篇文章主要給大家介紹了關(guān)于spring Boot查詢數(shù)據(jù)分頁顯示的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家學習或者使用spring Boot具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧
    2020-08-08
  • springboot使用AOP+反射實現(xiàn)Excel數(shù)據(jù)的讀取

    springboot使用AOP+反射實現(xiàn)Excel數(shù)據(jù)的讀取

    本文主要介紹了springboot使用AOP+反射實現(xiàn)Excel數(shù)據(jù)的讀取,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-01-01

最新評論