TC?集群Seata1.6高可用架構(gòu)源碼解析
一、背景
TC 集群具有高可用架構(gòu),應(yīng)用到集群是這樣一個(gè)間接的關(guān)系:應(yīng)用 -》事務(wù)分組 -》TC 集群,應(yīng)用啟動(dòng)后所指定的事務(wù)分組不能變,可通過(guò)配置中心變更事務(wù)分組所屬的 TC 集群,Seata 客戶端監(jiān)聽(tīng)到這個(gè)變更后,會(huì)切換到新的 TC 集群。

本篇從源碼梳理這個(gè)高可用能力是如何實(shí)現(xiàn)的。
二、環(huán)境配置
客戶端配置使用nacos配置中心和nacos注冊(cè)中心,
seata:
enabled: true
# Seata 應(yīng)用編號(hào)
application-id: seataclistock
# Seata 事務(wù)組編號(hào),用于 TC 集群名。該配置需要與服務(wù)端提到的group相對(duì)應(yīng),也需要與下面的相對(duì)應(yīng)
tx-service-group: tx_group_stock
# 關(guān)閉自動(dòng)代理
enable-auto-data-source-proxy: false
config:
# support: nacos, consul, apollo, zk, etcd3
type: nacos
nacos:
serverAddr:
namespace: seata # 需要與服務(wù)端添加的配置文件相同
group: SEATA_GROUP_ROCKTEST
username: seata
password: seata
data-id: seataClient.tx_group_busin.properties
registry:
# support: nacos, eureka, redis, zk, consul, etcd3, sofa
type: nacos
nacos:
application: seata-server
serverAddr:
namespace: seata # 需要與服務(wù)端添加的配置文件相同
group: SEATA_GROUP_ROCKTEST # 需要與服務(wù)端添加的配置文件相同
username: seata
password: seata
三、從配置中心獲取TC集群
服務(wù)注冊(cè)的能力要依賴配置中心,從nacos的配置中心獲取配置NacosConfiguration#initSeataConfig
Data Id:seataClient.tx_group_stock.properties
Group:SEATA_GROUP_LWKTEST
其中的service.vgroupMapping.tx_group_stock的值是dev_cluster_1,接下來(lái)注冊(cè)能力就要使用這個(gè)集群來(lái)工作。
private static void initSeataConfig() {
try {
String nacosDataId = getNacosDataId();
String config = configService.getConfig(nacosDataId, getNacosGroup(), DEFAULT_CONFIG_TIMEOUT);
if (StringUtils.isNotBlank(config)) {
seataConfig = ConfigProcessor.processConfig(config, getNacosDataType());
NacosListener nacosListener = new NacosListener(nacosDataId, null);
configService.addListener(nacosDataId, getNacosGroup(), nacosListener);
}
} catch (NacosException | IOException e) {
LOGGER.error("init config properties error", e);
}
}
RegistryFactory#getInstance()這是個(gè)單例機(jī)制,所以源碼梳理起來(lái)很簡(jiǎn)單,下邊獲取TC服務(wù)的時(shí)候會(huì)調(diào)用此單例方法做初始化。
- 讀取配置文件中
registry.type, - 配置的值是
nacos,所以讀出的值是nacos - 通過(guò)SPI加載并實(shí)例化
NacosRegistryProvider
public class RegistryFactory {
/**
* Gets instance.
*
* @return the instance
*/
public static RegistryService getInstance() {
return RegistryFactoryHolder.INSTANCE;
}
private static RegistryService buildRegistryService() {
RegistryType registryType;
//registryTypeName = "registry.type"
String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(
ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ ConfigurationKeys.FILE_ROOT_TYPE);
try {
// nacos
registryType = RegistryType.getType(registryTypeName);
} catch (Exception exx) {
throw new NotSupportYetException("not support registry type: " + registryTypeName);
}
// 通過(guò)SPI 加載并實(shí)例化 NacosRegistryProvider
return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide();
}
private static class RegistryFactoryHolder {
private static final RegistryService INSTANCE = buildRegistryService();
}
}
TM、RM 客戶端需要與TC通信,所以在其初始化時(shí)必然會(huì)有獲取TC集群的邏輯,對(duì)應(yīng)在源碼TmNettyRemotingClient#init 中的reconnect方法。
@Override
public void init() {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
//父類中會(huì)開(kāi)啟定時(shí)任務(wù)來(lái)執(zhí)行 getClientChannelManager().reconnect(transactionServiceGroup)
super.init();
if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {
getClientChannelManager().reconnect(transactionServiceGroup);
}
}
}
reconnect中的.NettyClientChannelManager#getAvailServerList 是根據(jù)seata.tx-service-group的值來(lái)檢索TC集群信息。直接提供出來(lái)調(diào)用堆棧,方便大家快速熟悉調(diào)用鏈路:
getServiceGroup:111, RegistryService (io.seata.discovery.registry) lookup:145, NacosRegistryServiceImpl (io.seata.discovery.registry.nacos) getAvailServerList:257, NettyClientChannelManager (io.seata.core.rpc.netty) reconnect:171, NettyClientChannelManager (io.seata.core.rpc.netty) init:198, TmNettyRemotingClient (io.seata.core.rpc.netty) init:47, TMClient (io.seata.tm) initClient:220, GlobalTransactionScanner (io.seata.spring.annotation) afterPropertiesSet:512, GlobalTransactionScanner (io.seata.spring.annotation)
這里便是通過(guò)Seata客戶端 seata.tx-service-group的值,找到最終TC集群的關(guān)鍵之處。在getServiceGroup中從nacos中獲取service.vgroupMapping.tx_group_stock的值,即dev_cluster_1
default String getServiceGroup(String key) {
//key = service.vgroupMapping.tx_group_stock
key = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
if (!SERVICE_GROUP_NAME.contains(key)) {
ConfigurationCache.addConfigListener(key);
SERVICE_GROUP_NAME.add(key);
}
return ConfigurationFactory.getInstance().getConfig(key);
}
然后NettyClientChannelManager#reconnect中獲取 TC 集群中的所有 TC 服務(wù)節(jié)點(diǎn),對(duì)每個(gè)TC 服務(wù)節(jié)點(diǎn)建連。
for (String serverAddress : availList) {
try {
acquireChannel(serverAddress);
channelAddress.add(serverAddress);
} catch (Exception e) {
LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),
serverAddress, e.getMessage(), e);
}
}
再梳理一下,梳理這么多的關(guān)鍵就是通過(guò)tx_group_stock 找到 TC 集群 dev_cluster_1
客戶端:
seata: # 默認(rèn)關(guān)閉,如需啟用spring.datasource.dynami.seata需要同時(shí)開(kāi)啟 enabled: true # Seata 事務(wù)組編號(hào),用于 TC 集群名。該配置需要與服務(wù)端提到的group相對(duì)應(yīng),也需要與下面的相對(duì)應(yīng) tx-service-group: tx_group_stock
nacos:
Data ID: seataClient.tx_group_stock.properties
Group: SEATA_GROUP_ROCKTEST
配置內(nèi)容:
...
service.vgroupMapping.tx_group_stock=dev_cluster_1
...
TC服務(wù)端:
registry:
# support: nacos 、 eureka 、 redis 、 zk 、 consul 、 etcd3 、 sofa
type: nacos
preferred-networks: 30.240.*
nacos:
application: seata-server
cluster: dev_cluster_1
RegistryService#getServiceGroup中從nacos獲取值的時(shí)候,有個(gè)細(xì)節(jié)需要注意:通過(guò)namespace + Group + Key(Data Id) 三維來(lái)唯一標(biāo)示一個(gè)Key。
四、刷新TC集群
AbstractNettyRemotingClient#init中默認(rèn)會(huì)每隔10s進(jìn)行一次 TC 服務(wù)清單刷新與重連
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);以上就是TC 集群Seata1.6高可用架構(gòu)源碼解析的詳細(xì)內(nèi)容,更多關(guān)于TC 集群Seata高可用架構(gòu)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java中l(wèi)ist.forEach()和list.stream().forEach()區(qū)別
這篇文章主要介紹了java中l(wèi)ist.forEach()和list.stream().forEach()區(qū)別,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03
Java循環(huán)終止的實(shí)現(xiàn)方式總結(jié)
循環(huán)是一種重復(fù)執(zhí)行一段代碼的結(jié)構(gòu),Java提供了四種主要的循環(huán)結(jié)構(gòu),本文主要來(lái)和大家介紹一下Java循環(huán)終止的實(shí)現(xiàn)方式,有需要的小伙伴可以參考一下2023-10-10
Servlet服務(wù)端實(shí)現(xiàn)原理詳解
Servlet是Sun公司開(kāi)發(fā)動(dòng)態(tài)web的一門(mén)技術(shù),Sun公司在這些API中提供了一個(gè)接口叫做:Servlet,如果想開(kāi)發(fā)一個(gè)Servlet程序,只需要完成兩個(gè)小步驟:編寫(xiě)一個(gè)類,實(shí)現(xiàn)Servlet接口、把開(kāi)發(fā)好的Java類部署到web服務(wù)器中。但是你了解Servlet實(shí)現(xiàn)的原理嗎2022-07-07
Spring內(nèi)置任務(wù)調(diào)度如何實(shí)現(xiàn)添加、取消與重置詳解
任務(wù)調(diào)度是我們?nèi)粘i_(kāi)發(fā)中經(jīng)常會(huì)碰到的,下面這篇文章主要給大家介紹了關(guān)于Spring內(nèi)置任務(wù)調(diào)度如何實(shí)現(xiàn)添加、取消與重置的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-10-10
實(shí)例講解Java批量插入、更新數(shù)據(jù)
這片文章介紹了一個(gè)Java批量添加數(shù)據(jù),多個(gè)字段同時(shí)添加多條數(shù)據(jù)具體實(shí)例,面向的是Oracle數(shù)據(jù)庫(kù),需要的朋友可以參考下2015-07-07
Java開(kāi)發(fā)HashMap?key必須實(shí)現(xiàn)hashCode?equals方法原理
這篇文章主要為大家介紹了Java開(kāi)發(fā)HashMap?key必須實(shí)現(xiàn)hashCode?equals方法原理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-03-03
Java使用agent實(shí)現(xiàn)main方法之前的實(shí)例詳解
這篇文章主要介紹了Java使用agent實(shí)現(xiàn)main方法之前的實(shí)例詳解的相關(guān)資料,希望通過(guò)本文能幫助到大家,讓大家理解這部分內(nèi)容,需要的朋友可以參考下2017-10-10

