Java實(shí)現(xiàn)心跳機(jī)制的方法
一、心跳機(jī)制簡(jiǎn)介
在分布式系統(tǒng)中,分布在不同主機(jī)上的節(jié)點(diǎn)需要檢測(cè)其他節(jié)點(diǎn)的狀態(tài),如服務(wù)器節(jié)點(diǎn)需要檢測(cè)從節(jié)點(diǎn)是否失效。為了檢測(cè)對(duì)方節(jié)點(diǎn)的有效性,每隔固定時(shí)間就發(fā)送一個(gè)固定信息給對(duì)方,對(duì)方回復(fù)一個(gè)固定信息,如果長(zhǎng)時(shí)間沒(méi)有收到對(duì)方的回復(fù),則斷開與對(duì)方的連接。
發(fā)包方既可以是服務(wù)端,也可以是客戶端,這要看具體實(shí)現(xiàn)。因?yàn)槭敲扛艄潭〞r(shí)間發(fā)送一次,類似心跳,所以發(fā)送的固定信息稱為心跳包。心跳包一般為比較小的包,可根據(jù)具體實(shí)現(xiàn)。心跳包主要應(yīng)用于長(zhǎng)連接的保持與短線鏈接。
一般而言,應(yīng)該客戶端主動(dòng)向服務(wù)器發(fā)送心跳包,因?yàn)榉?wù)器向客戶端發(fā)送心跳包會(huì)影響服務(wù)器的性能。
二、心跳機(jī)制實(shí)現(xiàn)方式
心跳機(jī)制有兩種實(shí)現(xiàn)方式,一種基于TCP自帶的心跳包,TCP的SO_KEEPALIVE選項(xiàng)可以,系統(tǒng)默認(rèn)的默認(rèn)跳幀頻率為2小時(shí),超過(guò)2小時(shí)后,本地的TCP 實(shí)現(xiàn)會(huì)發(fā)送一個(gè)數(shù)據(jù)包給遠(yuǎn)程的 Socket. 如果遠(yuǎn)程Socket 沒(méi)有發(fā)回響應(yīng), TCP實(shí)現(xiàn)就會(huì)持續(xù)嘗試 11 分鐘, 直到接收到響應(yīng)為止。 否則就會(huì)自動(dòng)斷開Socket連接。但TCP自帶的心跳包無(wú)法檢測(cè)比較敏感地知道對(duì)方的狀態(tài),默認(rèn)2小時(shí)的空閑時(shí)間,對(duì)于大多數(shù)的應(yīng)用而言太長(zhǎng)了??梢允止ら_啟KeepAlive功能并設(shè)置合理的KeepAlive參數(shù)。
另一種在應(yīng)用層自己進(jìn)行實(shí)現(xiàn),基本步驟如下:
Client使用定時(shí)器,不斷發(fā)送心跳;
Server收到心跳后,回復(fù)一個(gè)包;
Server為每個(gè)Client啟動(dòng)超時(shí)定時(shí)器,如果在指定時(shí)間內(nèi)沒(méi)有收到Client的心跳包,則Client失效。
三、Java實(shí)現(xiàn)心跳機(jī)制
這里基于Java實(shí)現(xiàn)的簡(jiǎn)單RPC框架實(shí)現(xiàn)心跳機(jī)制。Java實(shí)現(xiàn)代碼如下所示:
心跳客戶端類:
public class HeartbeatClient implements Runnable {
private String serverIP = "127.0.0.1";
private int serverPort = 8089;
private String nodeID = UUID.randomUUID().toString();
private boolean isRunning = true;
// 最近的心跳時(shí)間
private long lastHeartbeat;
// 心跳間隔時(shí)間
private long heartBeatInterval = 10 * 1000;
public void run() {
try {
while (isRunning) {
HeartbeatHandler handler = RPClient.getRemoteProxyObj(HeartbeatHandler.class, new InetSocketAddress(serverIP, serverPort));
long startTime = System.currentTimeMillis();
// 是否達(dá)到發(fā)送心跳的周期時(shí)間
if (startTime - lastHeartbeat > heartBeatInterval) {
System.out.println("send a heart beat");
lastHeartbeat = startTime;
HeartbeatEntity entity = new HeartbeatEntity();
entity.setTime(startTime);
entity.setNodeID(nodeID);
// 向服務(wù)器發(fā)送心跳,并返回需要執(zhí)行的命令
Cmder cmds = handler.sendHeartBeat(entity);
if (!processCommand(cmds))
continue;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private boolean processCommand(Cmder cmds) {
// ...
return true;
}
}
心跳包實(shí)體類:
public class HeartbeatEntity implements Serializable {
private long time;
private String nodeID;
private String error;
private Map<String, Object> info = new HashMap<String, Object>();
public String getNodeID() {
return nodeID;
}
public void setNodeID(String nodeID) {
this.nodeID = nodeID;
}
public String getError() {
return error;
}
public void setError(String error) {
this.error = error;
}
public Map<String, Object> getInfo() {
return info;
}
public void setInfo(Map<String, Object> info) {
this.info = info;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
}
服務(wù)器接受心跳包返回的命令對(duì)象類:
public class Cmder implements Serializable {
private String nodeID;
private String error;
private Map<String, Object> info = new HashMap<String, Object>();
public String getNodeID() {
return nodeID;
}
public void setNodeID(String nodeID) {
this.nodeID = nodeID;
}
public String getError() {
return error;
}
public void setError(String error) {
this.error = error;
}
public Map<String, Object> getInfo() {
return info;
}
public void setInfo(Map<String, Object> info) {
this.info = info;
}
}
RPC服務(wù)注冊(cè)中心:
public class ServiceCenter {
private ExecutorService executor = Executors.newFixedThreadPool(20);
private final ConcurrentHashMap<String, Class> serviceRegistry = new ConcurrentHashMap<String, Class>();
private AtomicBoolean isRunning = new AtomicBoolean(true);
// 服務(wù)器監(jiān)聽(tīng)端口
private int port = 8089;
// 心跳監(jiān)聽(tīng)器
HeartbeatLinstener linstener;
// 單例模式
private static class SingleHolder {
private static final ServiceCenter INSTANCE = new ServiceCenter();
}
private ServiceCenter() {
}
public static ServiceCenter getInstance() {
return SingleHolder.INSTANCE;
}
public void register(Class serviceInterface, Class impl) {
System.out.println("regeist service " + serviceInterface.getName());
serviceRegistry.put(serviceInterface.getName(), impl);
}
public void start() throws IOException {
ServerSocket server = new ServerSocket();
server.bind(new InetSocketAddress(port));
System.out.println("start server");
linstener = HeartbeatLinstener.getInstance();
System.out.println("start listen heart beat");
try {
while (true) {
// 1.監(jiān)聽(tīng)客戶端的TCP連接,接到TCP連接后將其封裝成task,由線程池執(zhí)行
executor.execute(new ServiceTask(server.accept()));
}
} finally {
server.close();
}
}
public void stop() {
isRunning.set(false);
executor.shutdown();
}
public boolean isRunning() {
return isRunning.get();
}
public int getPort() {
return port;
}
public void settPort(int port) {
this.port = port;
}
public ConcurrentHashMap<String, Class> getServiceRegistry() {
return serviceRegistry;
}
private class ServiceTask implements Runnable {
Socket clent = null;
public ServiceTask(Socket client) {
this.clent = client;
}
public void run() {
ObjectInputStream input = null;
ObjectOutputStream output = null;
try {
// 2.將客戶端發(fā)送的碼流反序列化成對(duì)象,反射調(diào)用服務(wù)實(shí)現(xiàn)者,獲取執(zhí)行結(jié)果
input = new ObjectInputStream(clent.getInputStream());
String serviceName = input.readUTF();
String methodName = input.readUTF();
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
Object[] arguments = (Object[]) input.readObject();
Class serviceClass = serviceRegistry.get(serviceName);
if (serviceClass == null) {
throw new ClassNotFoundException(serviceName + " not found");
}
Method method = serviceClass.getMethod(methodName, parameterTypes);
Object result = method.invoke(serviceClass.newInstance(), arguments);
// 3.將執(zhí)行結(jié)果反序列化,通過(guò)socket發(fā)送給客戶端
output = new ObjectOutputStream(clent.getOutputStream());
output.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (output != null) {
try {
output.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (input != null) {
try {
input.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (clent != null) {
try {
clent.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
心跳監(jiān)聽(tīng)類:
package com.cang.heartbeat;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 心跳監(jiān)聽(tīng)保存信息
*
* @author cang
* @create_time 2016-09-28 11:40
*/
public class HeartbeatLinstener {
private ExecutorService executor = Executors.newFixedThreadPool(20);
private final ConcurrentHashMap<String, Object> nodes = new ConcurrentHashMap<String, Object>();
private final ConcurrentHashMap<String, Long> nodeStatus = new ConcurrentHashMap<String, Long>();
private long timeout = 10 * 1000;
// 服務(wù)器監(jiān)聽(tīng)端口
private int port = 8089;
// 單例模式
private static class SingleHolder {
private static final HeartbeatLinstener INSTANCE = new HeartbeatLinstener();
}
private HeartbeatLinstener() {
}
public static HeartbeatLinstener getInstance() {
return SingleHolder.INSTANCE;
}
public ConcurrentHashMap<String, Object> getNodes() {
return nodes;
}
public void registerNode(String nodeId, Object nodeInfo) {
nodes.put(nodeId, nodeInfo);
nodeStatus.put(nodeId, System.currentTimeMillis());
}
public void removeNode(String nodeID) {
if (nodes.containsKey(nodeID)) {
nodes.remove(nodeID);
}
}
// 檢測(cè)節(jié)點(diǎn)是否有效
public boolean checkNodeValid(String key) {
if (!nodes.containsKey(key) || !nodeStatus.containsKey(key)) return false;
if ((System.currentTimeMillis() - nodeStatus.get(key)) > timeout) return false;
return true;
}
// 刪除所有失效節(jié)點(diǎn)
public void removeInValidNode() {
Iterator<Map.Entry<String, Long>> it = nodeStatus.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, Long> e = it.next();
if ((System.currentTimeMillis() - nodeStatus.get(e.getKey())) > timeout) {
nodes.remove(e.getKey());
}
}
}
}
心跳處理類接口:
public interface HeartbeatHandler {
public Cmder sendHeartBeat(HeartbeatEntity info);
}
心跳處理實(shí)現(xiàn)類:
public class HeartbeatHandlerImpl implements HeartbeatHandler {
public Cmder sendHeartBeat(HeartbeatEntity info) {
HeartbeatLinstener linstener = HeartbeatLinstener.getInstance();
// 添加節(jié)點(diǎn)
if (!linstener.checkNodeValid(info.getNodeID())) {
linstener.registerNode(info.getNodeID(), info);
}
// 其他操作
Cmder cmder = new Cmder();
cmder.setNodeID(info.getNodeID());
// ...
System.out.println("current all the nodes: ");
Map<String, Object> nodes = linstener.getNodes();
for (Map.Entry e : nodes.entrySet()) {
System.out.println(e.getKey() + " : " + e.getValue());
}
System.out.println("hadle a heartbeat");
return cmder;
}
}
測(cè)試類:
public class HeartbeatTest {
public static void main(String[] args) {
new Thread(new Runnable() {
public void run() {
try {
ServiceCenter serviceServer = ServiceCenter.getInstance();
serviceServer.register(HeartbeatHandler.class, HeartbeatHandlerImpl.class);
serviceServer.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
Thread client1 = new Thread(new HeartbeatClient());
client1.start();
Thread client2 = new Thread(new HeartbeatClient());
client2.start();
}
}
四、總結(jié)
上面的代碼還有很多不足的地方,希望有空能進(jìn)行改善:
- 配置為硬編碼;
- 命令類Cmder沒(méi)有實(shí)際實(shí)現(xiàn),返回的Cmder對(duì)象沒(méi)有實(shí)際進(jìn)行處理;
其他小問(wèn)題就暫時(shí)不管了,希望以后能重寫上面的代碼。
以上就是Java實(shí)現(xiàn)心跳機(jī)制的方法的詳細(xì)內(nèi)容,更多關(guān)于Java實(shí)現(xiàn)心跳機(jī)制的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
IDEA遠(yuǎn)程部署調(diào)試Java應(yīng)用程序的詳細(xì)流程
這篇文章主要介紹了IDEA遠(yuǎn)程部署調(diào)試Java應(yīng)用程序,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),需要的朋友可以參考下2021-10-10
SpringBoot中使用Servlet的兩種方式小結(jié)
這篇文章主要介紹了SpringBoot中使用Servlet的兩種方式小結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07
springboot項(xiàng)目配置多數(shù)據(jù)庫(kù)連接的示例詳解
這篇文章主要介紹了springboot項(xiàng)目配置多數(shù)據(jù)庫(kù)連接的示例,本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2023-12-12
使用spring整合Quartz實(shí)現(xiàn)—定時(shí)器功能
這篇文章主要介紹了使用spring整合Quartz實(shí)現(xiàn)—定時(shí)器功能,不基于特定的基類的方法,需要的朋友可以參考下2018-04-04
SpringBoot整合Mybatis-Plus、Jwt實(shí)現(xiàn)登錄token設(shè)置
Spring Boot整合Mybatis-plus實(shí)現(xiàn)登錄常常需要使用JWT來(lái)生成用戶的token并設(shè)置用戶權(quán)限的攔截器,本文就來(lái)詳細(xì)的介紹一下,具有一定的參考價(jià)值,感興趣的可以了解一下2024-02-02
使用Java創(chuàng)建數(shù)據(jù)透視表并導(dǎo)出為PDF的方法
數(shù)據(jù)透視分析是一種強(qiáng)大的工具,可以幫助我們從大量數(shù)據(jù)中提取有用信息并進(jìn)行深入分析,本文將介紹如何使用Java來(lái)構(gòu)建PivotTable以及實(shí)現(xiàn)數(shù)據(jù)透視分析,并將其導(dǎo)出為PDF2023-10-10

