亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java實現(xiàn)心跳機制的方法

 更新時間:2020年07月09日 16:20:00   作者:蒼穹2018  
這篇文章主要介紹了Java實現(xiàn)心跳機制的方法,文中講解非常細(xì)致,幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下

一、心跳機制簡介

     在分布式系統(tǒng)中,分布在不同主機上的節(jié)點需要檢測其他節(jié)點的狀態(tài),如服務(wù)器節(jié)點需要檢測從節(jié)點是否失效。為了檢測對方節(jié)點的有效性,每隔固定時間就發(fā)送一個固定信息給對方,對方回復(fù)一個固定信息,如果長時間沒有收到對方的回復(fù),則斷開與對方的連接。

     發(fā)包方既可以是服務(wù)端,也可以是客戶端,這要看具體實現(xiàn)。因為是每隔固定時間發(fā)送一次,類似心跳,所以發(fā)送的固定信息稱為心跳包。心跳包一般為比較小的包,可根據(jù)具體實現(xiàn)。心跳包主要應(yīng)用于長連接的保持與短線鏈接。

      一般而言,應(yīng)該客戶端主動向服務(wù)器發(fā)送心跳包,因為服務(wù)器向客戶端發(fā)送心跳包會影響服務(wù)器的性能。

二、心跳機制實現(xiàn)方式

    心跳機制有兩種實現(xiàn)方式,一種基于TCP自帶的心跳包,TCP的SO_KEEPALIVE選項可以,系統(tǒng)默認(rèn)的默認(rèn)跳幀頻率為2小時,超過2小時后,本地的TCP 實現(xiàn)會發(fā)送一個數(shù)據(jù)包給遠(yuǎn)程的 Socket. 如果遠(yuǎn)程Socket 沒有發(fā)回響應(yīng), TCP實現(xiàn)就會持續(xù)嘗試 11 分鐘, 直到接收到響應(yīng)為止。 否則就會自動斷開Socket連接。但TCP自帶的心跳包無法檢測比較敏感地知道對方的狀態(tài),默認(rèn)2小時的空閑時間,對于大多數(shù)的應(yīng)用而言太長了??梢允止ら_啟KeepAlive功能并設(shè)置合理的KeepAlive參數(shù)。

    另一種在應(yīng)用層自己進行實現(xiàn),基本步驟如下:

Client使用定時器,不斷發(fā)送心跳;
Server收到心跳后,回復(fù)一個包;
Server為每個Client啟動超時定時器,如果在指定時間內(nèi)沒有收到Client的心跳包,則Client失效。

三、Java實現(xiàn)心跳機制

    這里基于Java實現(xiàn)的簡單RPC框架實現(xiàn)心跳機制。Java實現(xiàn)代碼如下所示:

    心跳客戶端類:

public class HeartbeatClient implements Runnable {
 
 private String serverIP = "127.0.0.1";
 private int serverPort = 8089;
 private String nodeID = UUID.randomUUID().toString();
 private boolean isRunning = true;
 // 最近的心跳時間
 private long lastHeartbeat;
 // 心跳間隔時間
 private long heartBeatInterval = 10 * 1000;
 
 public void run() {
 try {
  while (isRunning) {
  HeartbeatHandler handler = RPClient.getRemoteProxyObj(HeartbeatHandler.class, new InetSocketAddress(serverIP, serverPort));
  long startTime = System.currentTimeMillis();
  // 是否達到發(fā)送心跳的周期時間
  if (startTime - lastHeartbeat > heartBeatInterval) {
   System.out.println("send a heart beat");
   lastHeartbeat = startTime;
 
   HeartbeatEntity entity = new HeartbeatEntity();
   entity.setTime(startTime);
   entity.setNodeID(nodeID);
 
   // 向服務(wù)器發(fā)送心跳,并返回需要執(zhí)行的命令
   Cmder cmds = handler.sendHeartBeat(entity);
 
   if (!processCommand(cmds))
   continue;
  }
  }
 } catch (Exception e) {
  e.printStackTrace();
 }
 }
 
 private boolean processCommand(Cmder cmds) {
 // ...
 return true;
 }
 
}

      心跳包實體類:

public class HeartbeatEntity implements Serializable {
 
 private long time;
 private String nodeID;
 private String error;
 private Map<String, Object> info = new HashMap<String, Object>();
 
 public String getNodeID() {
 return nodeID;
 }
 
 public void setNodeID(String nodeID) {
 this.nodeID = nodeID;
 }
 
 public String getError() {
 return error;
 }
 
 public void setError(String error) {
 this.error = error;
 }
 
 public Map<String, Object> getInfo() {
 return info;
 }
 
 public void setInfo(Map<String, Object> info) {
 this.info = info;
 }
 
 public long getTime() {
 return time;
 }
 
 public void setTime(long time) {
 this.time = time;
 }
}

  服務(wù)器接受心跳包返回的命令對象類:

public class Cmder implements Serializable {
 
 private String nodeID;
 private String error;
 private Map<String, Object> info = new HashMap<String, Object>();
 
 public String getNodeID() {
 return nodeID;
 }
 
 public void setNodeID(String nodeID) {
 this.nodeID = nodeID;
 }
 
 public String getError() {
 return error;
 }
 
 public void setError(String error) {
 this.error = error;
 }
 
 public Map<String, Object> getInfo() {
 return info;
 }
 
 public void setInfo(Map<String, Object> info) {
 this.info = info;
 }
}

  RPC服務(wù)注冊中心:

public class ServiceCenter {
 
 private ExecutorService executor = Executors.newFixedThreadPool(20);
 
 private final ConcurrentHashMap<String, Class> serviceRegistry = new ConcurrentHashMap<String, Class>();
 
 private AtomicBoolean isRunning = new AtomicBoolean(true);
 
 // 服務(wù)器監(jiān)聽端口
 private int port = 8089;
 
 // 心跳監(jiān)聽器
 HeartbeatLinstener linstener;
 
 // 單例模式
 private static class SingleHolder {
 private static final ServiceCenter INSTANCE = new ServiceCenter();
 }
 
 private ServiceCenter() {
 }
 
 public static ServiceCenter getInstance() {
 return SingleHolder.INSTANCE;
 }
 
 public void register(Class serviceInterface, Class impl) {
 System.out.println("regeist service " + serviceInterface.getName());
 serviceRegistry.put(serviceInterface.getName(), impl);
 }
 
 public void start() throws IOException {
 ServerSocket server = new ServerSocket();
 server.bind(new InetSocketAddress(port));
 System.out.println("start server");
 linstener = HeartbeatLinstener.getInstance();
 System.out.println("start listen heart beat");
 try {
  while (true) {
  // 1.監(jiān)聽客戶端的TCP連接,接到TCP連接后將其封裝成task,由線程池執(zhí)行
  executor.execute(new ServiceTask(server.accept()));
  }
 } finally {
  server.close();
 }
 }
 
 public void stop() {
 isRunning.set(false);
 executor.shutdown();
 }
 
 
 public boolean isRunning() {
 return isRunning.get();
 }
 
 public int getPort() {
 return port;
 }
 
 public void settPort(int port) {
 this.port = port;
 }
 
 public ConcurrentHashMap<String, Class> getServiceRegistry() {
 return serviceRegistry;
 }
 
 private class ServiceTask implements Runnable {
 Socket clent = null;
 
 public ServiceTask(Socket client) {
  this.clent = client;
 }
 
 public void run() {
  ObjectInputStream input = null;
  ObjectOutputStream output = null;
  try {
  // 2.將客戶端發(fā)送的碼流反序列化成對象,反射調(diào)用服務(wù)實現(xiàn)者,獲取執(zhí)行結(jié)果
  input = new ObjectInputStream(clent.getInputStream());
  String serviceName = input.readUTF();
  String methodName = input.readUTF();
  Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
  Object[] arguments = (Object[]) input.readObject();
  Class serviceClass = serviceRegistry.get(serviceName);
  if (serviceClass == null) {
   throw new ClassNotFoundException(serviceName + " not found");
  }
  Method method = serviceClass.getMethod(methodName, parameterTypes);
  Object result = method.invoke(serviceClass.newInstance(), arguments);
 
  // 3.將執(zhí)行結(jié)果反序列化,通過socket發(fā)送給客戶端
  output = new ObjectOutputStream(clent.getOutputStream());
  output.writeObject(result);
  } catch (Exception e) {
  e.printStackTrace();
  } finally {
  if (output != null) {
   try {
   output.close();
   } catch (IOException e) {
   e.printStackTrace();
   }
  }
  if (input != null) {
   try {
   input.close();
   } catch (IOException e) {
   e.printStackTrace();
   }
  }
  if (clent != null) {
   try {
   clent.close();
   } catch (IOException e) {
   e.printStackTrace();
   }
  }
  }
 
 }
 }
}

  心跳監(jiān)聽類:

package com.cang.heartbeat;
 
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
 
/**
 * 心跳監(jiān)聽保存信息
 *
 * @author cang
 * @create_time 2016-09-28 11:40
 */
public class HeartbeatLinstener {
 
 private ExecutorService executor = Executors.newFixedThreadPool(20);
 
 private final ConcurrentHashMap<String, Object> nodes = new ConcurrentHashMap<String, Object>();
 private final ConcurrentHashMap<String, Long> nodeStatus = new ConcurrentHashMap<String, Long>();
 
 private long timeout = 10 * 1000;
 
 // 服務(wù)器監(jiān)聽端口
 private int port = 8089;
 
 // 單例模式
 private static class SingleHolder {
 private static final HeartbeatLinstener INSTANCE = new HeartbeatLinstener();
 }
 
 private HeartbeatLinstener() {
 }
 
 public static HeartbeatLinstener getInstance() {
 return SingleHolder.INSTANCE;
 }
 
 public ConcurrentHashMap<String, Object> getNodes() {
 return nodes;
 }
 
 public void registerNode(String nodeId, Object nodeInfo) {
 nodes.put(nodeId, nodeInfo);
 nodeStatus.put(nodeId, System.currentTimeMillis());
 }
 
 public void removeNode(String nodeID) {
 if (nodes.containsKey(nodeID)) {
  nodes.remove(nodeID);
 }
 }
 
 // 檢測節(jié)點是否有效
 public boolean checkNodeValid(String key) {
 if (!nodes.containsKey(key) || !nodeStatus.containsKey(key)) return false;
 if ((System.currentTimeMillis() - nodeStatus.get(key)) > timeout) return false;
 return true;
 }
 
 // 刪除所有失效節(jié)點
 public void removeInValidNode() {
 Iterator<Map.Entry<String, Long>> it = nodeStatus.entrySet().iterator();
 while (it.hasNext()) {
  Map.Entry<String, Long> e = it.next();
  if ((System.currentTimeMillis() - nodeStatus.get(e.getKey())) > timeout) {
  nodes.remove(e.getKey());
  }
 }
 }
 
}

  心跳處理類接口:

public interface HeartbeatHandler {
 public Cmder sendHeartBeat(HeartbeatEntity info);
}

      心跳處理實現(xiàn)類:

public class HeartbeatHandlerImpl implements HeartbeatHandler {
 public Cmder sendHeartBeat(HeartbeatEntity info) {
 HeartbeatLinstener linstener = HeartbeatLinstener.getInstance();
 
 // 添加節(jié)點
 if (!linstener.checkNodeValid(info.getNodeID())) {
  linstener.registerNode(info.getNodeID(), info);
 }
 
 // 其他操作
 Cmder cmder = new Cmder();
 cmder.setNodeID(info.getNodeID());
 // ...
 
 System.out.println("current all the nodes: ");
 Map<String, Object> nodes = linstener.getNodes();
 for (Map.Entry e : nodes.entrySet()) {
  System.out.println(e.getKey() + " : " + e.getValue());
 }
 System.out.println("hadle a heartbeat");
 return cmder;
 }
}

  測試類:

public class HeartbeatTest {
 
 public static void main(String[] args) {
 new Thread(new Runnable() {
  public void run() {
  try {
   ServiceCenter serviceServer = ServiceCenter.getInstance();
   serviceServer.register(HeartbeatHandler.class, HeartbeatHandlerImpl.class);
   serviceServer.start();
  } catch (IOException e) {
   e.printStackTrace();
  }
  }
 }).start();
 Thread client1 = new Thread(new HeartbeatClient());
 client1.start();
 Thread client2 = new Thread(new HeartbeatClient());
 client2.start();
 }
}

四、總結(jié)

    上面的代碼還有很多不足的地方,希望有空能進行改善:

  •  配置為硬編碼;
  •  命令類Cmder沒有實際實現(xiàn),返回的Cmder對象沒有實際進行處理;

其他小問題就暫時不管了,希望以后能重寫上面的代碼。

以上就是Java實現(xiàn)心跳機制的方法的詳細(xì)內(nèi)容,更多關(guān)于Java實現(xiàn)心跳機制的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Spring Boot 文件上傳與下載的示例代碼

    Spring Boot 文件上傳與下載的示例代碼

    這篇文章主要介紹了Spring Boot 文件上傳與下載的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-03-03
  • Java并發(fā)之Semaphore工具類r的全面解析

    Java并發(fā)之Semaphore工具類r的全面解析

    Semaphore 是 java.util.concurrent中非常有用的并發(fā)編程工具類,它通常被用于限制對某個資源或資源池的并發(fā)訪問數(shù)量,下面我們就來深入了解一下Semaphore的具體使用吧
    2024-02-02
  • IDEA遠(yuǎn)程部署調(diào)試Java應(yīng)用程序的詳細(xì)流程

    IDEA遠(yuǎn)程部署調(diào)試Java應(yīng)用程序的詳細(xì)流程

    這篇文章主要介紹了IDEA遠(yuǎn)程部署調(diào)試Java應(yīng)用程序,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2021-10-10
  • java ReentrantLock詳解

    java ReentrantLock詳解

    這篇文章主要介紹了java ReentrantLock,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-04-04
  • SpringBoot中使用Servlet的兩種方式小結(jié)

    SpringBoot中使用Servlet的兩種方式小結(jié)

    這篇文章主要介紹了SpringBoot中使用Servlet的兩種方式小結(jié),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-07-07
  • Spring中的BeanFactory工廠詳細(xì)解析

    Spring中的BeanFactory工廠詳細(xì)解析

    這篇文章主要介紹了Spring中的BeanFactory工廠詳細(xì)解析,Spring的本質(zhì)是一個bean工廠(beanFactory)或者說bean容器,它按照我們的要求,生產(chǎn)我們需要的各種各樣的bean,提供給我們使用,需要的朋友可以參考下
    2023-12-12
  • springboot項目配置多數(shù)據(jù)庫連接的示例詳解

    springboot項目配置多數(shù)據(jù)庫連接的示例詳解

    這篇文章主要介紹了springboot項目配置多數(shù)據(jù)庫連接的示例,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2023-12-12
  • 使用spring整合Quartz實現(xiàn)—定時器功能

    使用spring整合Quartz實現(xiàn)—定時器功能

    這篇文章主要介紹了使用spring整合Quartz實現(xiàn)—定時器功能,不基于特定的基類的方法,需要的朋友可以參考下
    2018-04-04
  • SpringBoot整合Mybatis-Plus、Jwt實現(xiàn)登錄token設(shè)置

    SpringBoot整合Mybatis-Plus、Jwt實現(xiàn)登錄token設(shè)置

    Spring Boot整合Mybatis-plus實現(xiàn)登錄常常需要使用JWT來生成用戶的token并設(shè)置用戶權(quán)限的攔截器,本文就來詳細(xì)的介紹一下,具有一定的參考價值,感興趣的可以了解一下
    2024-02-02
  • 使用Java創(chuàng)建數(shù)據(jù)透視表并導(dǎo)出為PDF的方法

    使用Java創(chuàng)建數(shù)據(jù)透視表并導(dǎo)出為PDF的方法

    數(shù)據(jù)透視分析是一種強大的工具,可以幫助我們從大量數(shù)據(jù)中提取有用信息并進行深入分析,本文將介紹如何使用Java來構(gòu)建PivotTable以及實現(xiàn)數(shù)據(jù)透視分析,并將其導(dǎo)出為PDF
    2023-10-10

最新評論