Spring Cloud Consul實現(xiàn)選舉機制的代碼工程
1.什么是Spring Cloud Consul?
Spring Cloud Consul 是 Spring Cloud 提供的對 HashiCorp Consul 的支持。它是一種基于服務(wù)網(wǎng)格的工具,用于實現(xiàn)服務(wù)注冊、發(fā)現(xiàn)、配置管理和健康檢查。 主要功能包括:
- 服務(wù)注冊與發(fā)現(xiàn):通過 Consul 的服務(wù)注冊功能,Spring Cloud Consul 可以實現(xiàn)微服務(wù)的動態(tài)注冊和發(fā)現(xiàn),簡化服務(wù)間通信。
- 分布式配置管理:通過 Consul 的 Key/Value 存儲機制,提供對分布式配置的管理。
- 健康檢查:支持服務(wù)實例的健康檢查,確保只有健康的實例可供其他服務(wù)調(diào)用。
- 選舉與分布式鎖:通過 Consul 的會話機制,支持分布式鎖和領(lǐng)導(dǎo)選舉。
Spring Cloud Consul 的選舉機制
Spring Cloud Consul 的選舉機制基于 Consul 會話(Session) 和 鍵值存儲(Key/Value Store) 實現(xiàn)分布式領(lǐng)導(dǎo)選舉。
工作原理:
- 會話創(chuàng)建:
- 服務(wù)實例向 Consul 創(chuàng)建一個會話(Session),這是一個臨時的、與實例綁定的對象。
- 會話帶有 TTL(生存時間),需要定期續(xù)約,保持活躍狀態(tài)。
- 獲取鎖(Lock):
- 通過將一個 Key 的值設(shè)置為當前會話 ID,服務(wù)嘗試獲取該 Key 的鎖。
- Consul 使用 CAS(Compare and Swap)操作來確保只有一個服務(wù)實例可以成功獲取鎖。
- 鎖定成功:
- 成功獲取鎖的服務(wù)實例被視為領(lǐng)導(dǎo)者(Leader)。
- 其他實例會定期嘗試獲取鎖,但只能等待當前鎖被釋放或超時。
- 鎖釋放或失效:
- 如果領(lǐng)導(dǎo)實例未能及時續(xù)約會話(例如宕機或網(wǎng)絡(luò)中斷),Consul 會釋放與該會話相關(guān)聯(lián)的鎖,其他實例可以競爭成為新的領(lǐng)導(dǎo)者。
2.環(huán)境搭建
run Consul Agent
docker run -d --name=dev-consul -p 8500:8500 consul
web ui
http://localhost:8500
3.代碼工程
實驗?zāi)繕?/h3>
- 使用 Consul 提供的會話機制和鍵值存儲來實現(xiàn) 分布式領(lǐng)導(dǎo)選舉。
- 通過
@InboundChannelAdapter 和 @ServiceActivator 實現(xiàn)周期性檢查領(lǐng)導(dǎo)身份并執(zhí)行領(lǐng)導(dǎo)任務(wù)。
@InboundChannelAdapter 和 @ServiceActivator 實現(xiàn)周期性檢查領(lǐng)導(dǎo)身份并執(zhí)行領(lǐng)導(dǎo)任務(wù)。pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloud-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>LeaderElection</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Spring Cloud Starter Consul Discovery -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
</dependencies>
</project>
LeaderElectionConfig.java
package com.et;
import jakarta.annotation.PreDestroy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.web.client.RestTemplate;
@Configuration
public class LeaderElectionConfig {
private static final String LEADER_KEY = "service/leader";
private static final String CONSUL_URL = "http://localhost:8500";
private String sessionId;
@Bean
@InboundChannelAdapter(value = "leaderChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<String> leaderMessageSource() {
return () -> {
// Implement logic to check if this instance is the leader
boolean isLeader = checkLeadership();
return MessageBuilder.withPayload(isLeader ? "I am the leader" : "I am not the leader").build();
};
}
@Bean
@ServiceActivator(inputChannel = "leaderChannel")
public MessageHandler leaderMessageHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
// Implement logic to perform leader-specific tasks
}
};
}
private final RestTemplate restTemplate = new RestTemplate();
public LeaderElectionConfig() {
this.sessionId = createSession();
}
private String createSession() {
String url = CONSUL_URL + "/v1/session/create";
HttpHeaders headers = new HttpHeaders();
HttpEntity<String> entity = new HttpEntity<>("{\"Name\": \"leader-election-session\"}", headers);
//ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class);
// PUT
ResponseEntity<String> response = restTemplate.exchange(url, HttpMethod.PUT, entity, String.class);
// Extract session ID from response
return response.getBody().split("\"")[3]; // This is a simple way to extract the session ID
}
public boolean checkLeadership() {
String url = CONSUL_URL + "/v1/kv/" + LEADER_KEY + "?acquire=" + sessionId;
HttpHeaders headers = new HttpHeaders();
HttpEntity<String> entity = new HttpEntity<>(headers);
ResponseEntity<Boolean> response = restTemplate.exchange(url, HttpMethod.PUT, entity, Boolean.class);
return Boolean.TRUE.equals(response.getBody());
}
public void releaseLeadership() {
String url = CONSUL_URL + "/v1/kv/" + LEADER_KEY + "?release=" + sessionId;
HttpHeaders headers = new HttpHeaders();
HttpEntity<String> entity = new HttpEntity<>(headers);
ResponseEntity<Boolean> response = restTemplate.exchange(url, HttpMethod.PUT, entity, Boolean.class);
if (Boolean.TRUE.equals(response.getBody())) {
System.out.println("Released leadership successfully");
} else {
System.out.println("Failed to release leadership");
}
}
@PreDestroy
public void onExit() {
releaseLeadership();
}
}
代碼解釋
- 初始化:
- 啟動時通過
createSession()向 Consul 注冊會話。
- 啟動時通過
- 周期性任務(wù):
- 每 5 秒通過
checkLeadership()檢查領(lǐng)導(dǎo)身份。 - 如果是領(lǐng)導(dǎo)者,執(zhí)行特定任務(wù)(如打印日志、執(zhí)行業(yè)務(wù)邏輯)。
- 每 5 秒通過
- 釋放資源:
- 應(yīng)用關(guān)閉時,通過
releaseLeadership()釋放鎖。
- 應(yīng)用關(guān)閉時,通過
LeaderElectionApplication.java
package com.et;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.integration.config.EnableIntegration;
@SpringBootApplication
@EnableDiscoveryClient
@EnableIntegration
public class LeaderElectionApplication {
public static void main(String[] args) {
SpringApplication.run(LeaderElectionApplication.class, args);
}
}
配置文件
node1
server.port=8081
spring.cloud.consul.discovery.enabled=true
spring.cloud.consul.discovery.register=true
spring.application.name=leader-election-example
spring.cloud.consul.host=localhost
spring.cloud.consul.port=8500
spring.cloud.consul.discovery.instance-id=${spring.application.name}:${spring.application.instance_id:${random.value}}
node2
server.port=8082
spring.cloud.consul.discovery.enabled=true
spring.cloud.consul.discovery.register=true
spring.application.name=leader-election-example
spring.cloud.consul.host=localhost
spring.cloud.consul.port=8500
spring.cloud.consul.discovery.instance-id=${spring.application.name}:${spring.application.instance_id:${random.value}}
以上只是一些關(guān)鍵代碼。
4.測試
啟動node1節(jié)點
java -jar myapp.jar --spring.profiles.active=node1
啟動node2節(jié)點
java -jar myapp.jar --spring.profiles.active=node2
通過控制臺觀察日志,其中只有一臺機器能選為主機
以上就是Spring Cloud Consul實現(xiàn)選舉機制的代碼工程的詳細內(nèi)容,更多關(guān)于Spring Cloud Consul選舉機制的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java并發(fā)容器ConcurrentHashMap深入分析
這篇文章主要為大家介紹了java并發(fā)容器ConcurrentHashMap使用示例及深入分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05
Java編程實現(xiàn)對十六進制字符串異或運算代碼示例
這篇文章主要介紹了Java編程實現(xiàn)對十六進制字符串異或運算代碼示例,簡述了異或運算以及具體實例,具有一定借鑒價值,需要的朋友可以參考下。2017-12-12
關(guān)于JavaEE匿名內(nèi)部類和Lambda表達式的注意事項
這篇文章主要介紹了關(guān)于JavaEE匿名內(nèi)部類和Lambda表達式的注意事項,匿名內(nèi)部類顧名思義是沒有修飾符甚至沒有名稱的內(nèi)部類,使用匿名內(nèi)部類需要注意哪些地方,我們一起來看看吧2023-03-03
Java 多線程并發(fā)AbstractQueuedSynchronizer詳情
這篇文章主要介紹了Java 多線程并發(fā)AbstractQueuedSynchronizer詳情,文章圍繞主題展開想象的內(nèi)容介紹,具有一定的參考價值,感興趣的小伙伴可以參考一下2022-06-06

