亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Netty + ZooKeeper 實(shí)現(xiàn)簡(jiǎn)單的服務(wù)注冊(cè)與發(fā)現(xiàn)

 更新時(shí)間:2019年06月17日 10:34:18   作者:fengzhizi715  
服務(wù)注冊(cè)和發(fā)現(xiàn)一直是分布式的核心組件。本文介紹了借助 ZooKeeper 做注冊(cè)中心,如何實(shí)現(xiàn)一個(gè)簡(jiǎn)單的服務(wù)注冊(cè)和發(fā)現(xiàn)。,需要的朋友可以參考下

一. 背景

最近的一個(gè)項(xiàng)目:我們的系統(tǒng)接收到上游系統(tǒng)的派單任務(wù)后,會(huì)推送到指定的門店的相關(guān)設(shè)備,并進(jìn)行相應(yīng)的業(yè)務(wù)處理。

二. Netty 的使用

在接收到派單任務(wù)之后,通過 Netty 推送到指定門店相關(guān)的設(shè)備。在我們的系統(tǒng)中 Netty 實(shí)現(xiàn)了消息推送、長(zhǎng)連接以及心跳機(jī)制。

2.1 Netty Server 端:

每個(gè) Netty 服務(wù)端通過 ConcurrentHashMap 保存了客戶端的 clientId 以及它連接的 SocketChannel。

服務(wù)器端向客戶端發(fā)送消息時(shí),只要獲取 clientId 對(duì)應(yīng)的 SocketChannel,往 SocketChannel 里寫入相應(yīng)的 message 即可。

EventLoopGroup boss = new NioEventLoopGroup(1);
  EventLoopGroup worker = new NioEventLoopGroup();
  ServerBootstrap bootstrap = new ServerBootstrap();
  bootstrap.group(boss, worker)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 128)
    .option(ChannelOption.TCP_NODELAY, true)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childHandler(new ChannelInitializer() {
     @Override
     protected void initChannel(Channel channel) throws Exception {
      ChannelPipeline p = channel.pipeline();
      p.addLast(new MessageEncoder());
      p.addLast(new MessageDecoder());
      p.addLast(new PushServerHandler());
     }
    });
  ChannelFuture future = bootstrap.bind(host,port).sync();
  if (future.isSuccess()) {
   logger.info("server start...");
  }

2.2 Netty Client 端:

客戶端用于接收服務(wù)端的消息,隨即進(jìn)行業(yè)務(wù)處理??蛻舳诉€有心跳機(jī)制,它通過 IdleEvent 事件定時(shí)向服務(wù)端放送 Ping 消息以此來檢測(cè) SocketChannel 是否中斷。

public PushClientBootstrap(String host, int port) throws InterruptedException {
  this.host = host;
  this.port = port;
  start(host,port);
 }
 private void start(String host, int port) throws InterruptedException {
  bootstrap = new Bootstrap();
  bootstrap.channel(NioSocketChannel.class)
    .option(ChannelOption.SO_KEEPALIVE, true)
    .group(workGroup)
    .remoteAddress(host, port)
    .handler(new ChannelInitializer(){
     @Override
     protected void initChannel(Channel channel) throws Exception {
      ChannelPipeline p = channel.pipeline();
      p.addLast(new IdleStateHandler(20, 10, 0)); // IdleStateHandler 用于檢測(cè)心跳
      p.addLast(new MessageDecoder());
      p.addLast(new MessageEncoder());
      p.addLast(new PushClientHandler());
     }
    });
  doConnect(port, host);
 }
 /**
  * 建立連接,并且可以實(shí)現(xiàn)自動(dòng)重連.
  * @param port port.
  * @param host host.
  * @throws InterruptedException InterruptedException.
  */
 private void doConnect(int port, String host) throws InterruptedException {
  if (socketChannel != null && socketChannel.isActive()) {
   return;
  }
  final int portConnect = port;
  final String hostConnect = host;
  ChannelFuture future = bootstrap.connect(host, port);
  future.addListener(new ChannelFutureListener() {
   @Override
   public void operationComplete(ChannelFuture futureListener) throws Exception {
    if (futureListener.isSuccess()) {
     socketChannel = (SocketChannel) futureListener.channel();
     logger.info("Connect to server successfully!");
    } else {
     logger.info("Failed to connect to server, try connect after 10s");
     futureListener.channel().eventLoop().schedule(new Runnable() {
      @Override
      public void run() {
       try {
        doConnect(portConnect, hostConnect);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
      }
     }, 10, TimeUnit.SECONDS);
    }
   }
  }).sync();
 }

三. 借助 ZooKeeper 實(shí)現(xiàn)簡(jiǎn)單的服務(wù)注冊(cè)與發(fā)現(xiàn)

3.1 服務(wù)注冊(cè)

服務(wù)注冊(cè)本質(zhì)上是為了解耦服務(wù)提供者和服務(wù)消費(fèi)者。服務(wù)注冊(cè)是一個(gè)高可用強(qiáng)一致性的服務(wù)發(fā)現(xiàn)存儲(chǔ)倉庫,主要用來存儲(chǔ)服務(wù)的api和地址對(duì)應(yīng)關(guān)系。為了高可用,服務(wù)注冊(cè)中心一般為一個(gè)集群,并且能夠保證分布式一致性。目前常用的有 ZooKeeper、Etcd 等等。

在我們項(xiàng)目中采用了 ZooKeeper 實(shí)現(xiàn)服務(wù)注冊(cè)。

public class ServiceRegistry {
 private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);
 private CountDownLatch latch = new CountDownLatch(1);
 private String registryAddress;
 public ServiceRegistry(String registryAddress) {
  this.registryAddress = registryAddress;
 }
 public void register(String data) {
  if (data != null) {
   ZooKeeper zk = connectServer();
   if (zk != null) {
    createNode(zk, data);
   }
  }
 }
 /**
  * 連接 zookeeper 服務(wù)器
  * @return
  */
 private ZooKeeper connectServer() {
  ZooKeeper zk = null;
  try {
   zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
     if (event.getState() == Event.KeeperState.SyncConnected) {
      latch.countDown();
     }
    }
   });
   latch.await();
  } catch (IOException | InterruptedException e) {
   logger.error("", e);
  }
  return zk;
 }
 /**
  * 創(chuàng)建節(jié)點(diǎn)
  * @param zk
  * @param data
  */
 private void createNode(ZooKeeper zk, String data) {
  try {
   byte[] bytes = data.getBytes();
   String path = zk.create(Constants.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
   logger.debug("create zookeeper node ({} => {})", path, data);
  } catch (KeeperException | InterruptedException e) {
   logger.error("", e);
  }
 }
}

有了服務(wù)注冊(cè),在 Netty 服務(wù)端啟動(dòng)之后,將 Netty 服務(wù)端的 ip 和 port 注冊(cè)到 ZooKeeper。

EventLoopGroup boss = new NioEventLoopGroup(1);
  EventLoopGroup worker = new NioEventLoopGroup();
  ServerBootstrap bootstrap = new ServerBootstrap();
  bootstrap.group(boss, worker)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 128)
    .option(ChannelOption.TCP_NODELAY, true)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childHandler(new ChannelInitializer() {
     @Override
     protected void initChannel(Channel channel) throws Exception {
      ChannelPipeline p = channel.pipeline();
      p.addLast(new MessageEncoder());
      p.addLast(new MessageDecoder());
      p.addLast(new PushServerHandler());
     }
    });
  ChannelFuture future = bootstrap.bind(host,port).sync();
  if (future.isSuccess()) {
   logger.info("server start...");
  }
  if (serviceRegistry != null) {
   serviceRegistry.register(host + ":" + port);
  }

3.2 服務(wù)發(fā)現(xiàn)

這里我們采用的是客戶端的服務(wù)發(fā)現(xiàn),即服務(wù)發(fā)現(xiàn)機(jī)制由客戶端實(shí)現(xiàn)。

客戶端在和服務(wù)端建立連接之前,通過查詢注冊(cè)中心的方式來獲取服務(wù)端的地址。如果存在有多個(gè) Netty 服務(wù)端的話,可以做服務(wù)的負(fù)載均衡。在我們的項(xiàng)目中只采用了簡(jiǎn)單的隨機(jī)法進(jìn)行負(fù)載。

public class ServiceDiscovery {
 private static final Logger logger = LoggerFactory.getLogger(ServiceDiscovery.class);
 private CountDownLatch latch = new CountDownLatch(1);
 private volatile List<String> serviceAddressList = new ArrayList<>();
 private String registryAddress; // 注冊(cè)中心的地址
 public ServiceDiscovery(String registryAddress) {
  this.registryAddress = registryAddress;
  ZooKeeper zk = connectServer();
  if (zk != null) {
   watchNode(zk);
  }
 }
 /**
  * 通過服務(wù)發(fā)現(xiàn),獲取服務(wù)提供方的地址
  * @return
  */
 public String discover() {
  String data = null;
  int size = serviceAddressList.size();
  if (size > 0) {
   if (size == 1) { //只有一個(gè)服務(wù)提供方
    data = serviceAddressList.get(0);
    logger.info("unique service address : {}", data);
   } else {   //使用隨機(jī)分配法。簡(jiǎn)單的負(fù)載均衡法
    data = serviceAddressList.get(ThreadLocalRandom.current().nextInt(size));
    logger.info("choose an address : {}", data);
   }
  }
  return data;
 }
 /**
  * 連接 zookeeper
  * @return
  */
 private ZooKeeper connectServer() {
  ZooKeeper zk = null;
  try {
   zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
     if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
      latch.countDown();
     }
    }
   });
   latch.await();
  } catch (IOException | InterruptedException e) {
   logger.error("", e);
  }
  return zk;
 }
 /**
  * 獲取服務(wù)地址列表
  * @param zk
  */
 private void watchNode(final ZooKeeper zk) {
  try {
   //獲取子節(jié)點(diǎn)列表
   List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY_PATH, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
     if (event.getType() == Event.EventType.NodeChildrenChanged) {
      //發(fā)生子節(jié)點(diǎn)變化時(shí)再次調(diào)用此方法更新服務(wù)地址
      watchNode(zk);
     }
    }
   });
   List<String> dataList = new ArrayList<>();
   for (String node : nodeList) {
    byte[] bytes = zk.getData(Constants.ZK_REGISTRY_PATH + "/" + node, false, null);
    dataList.add(new String(bytes));
   }
   logger.debug("node data: {}", dataList);
   this.serviceAddressList = dataList;
  } catch (KeeperException | InterruptedException e) {
   logger.error("", e);
  }
 }
}

Netty 客戶端啟動(dòng)之后,通過服務(wù)發(fā)現(xiàn)獲取 Netty 服務(wù)端的 ip 和 port。

/**
  * 支持通過服務(wù)發(fā)現(xiàn)來獲取 Socket 服務(wù)端的 host、port
  * @param discoveryAddress
  * @throws InterruptedException
  */
 public PushClientBootstrap(String discoveryAddress) throws InterruptedException {

  serviceDiscovery = new ServiceDiscovery(discoveryAddress);
  serverAddress = serviceDiscovery.discover();

  if (serverAddress!=null) {
   String[] array = serverAddress.split(":");
   if (array!=null && array.length==2) {

    String host = array[0];
    int port = Integer.parseInt(array[1]);

    start(host,port);
   }
  }
 }

四. 總結(jié)

服務(wù)注冊(cè)和發(fā)現(xiàn)一直是分布式的核心組件。本文介紹了借助 ZooKeeper 做注冊(cè)中心,如何實(shí)現(xiàn)一個(gè)簡(jiǎn)單的服務(wù)注冊(cè)和發(fā)現(xiàn)。其實(shí),注冊(cè)中心的選擇有很多,例如 Etcd、Eureka 等等。選擇符合我們業(yè)務(wù)需求的才是最重要的。

以上所述是小編給大家介紹的Netty + ZooKeeper 實(shí)現(xiàn)簡(jiǎn)單的服務(wù)注冊(cè)與發(fā)現(xiàn),希望對(duì)大家有所幫助,如果大家有任何疑問請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)腳本之家網(wǎng)站的支持!
如果你覺得本文對(duì)你有幫助,歡迎轉(zhuǎn)載,煩請(qǐng)注明出處,謝謝!

相關(guān)文章

  • Java中super關(guān)鍵字的用法和細(xì)節(jié)

    Java中super關(guān)鍵字的用法和細(xì)節(jié)

    大家好,本篇文章主要講的是Java中super關(guān)鍵字的用法和細(xì)節(jié),感興趣的同學(xué)趕快來看一看吧,對(duì)你有幫助的話記得收藏一下
    2022-01-01
  • MyBatis?多表聯(lián)合查詢及優(yōu)化方法

    MyBatis?多表聯(lián)合查詢及優(yōu)化方法

    大家都知道Hibernate 是全自動(dòng)的數(shù)據(jù)庫持久層框架,它可以通過實(shí)體來映射數(shù)據(jù)庫,通過設(shè)置一對(duì)多、多對(duì)一、一對(duì)一、多對(duì)多的關(guān)聯(lián)來實(shí)現(xiàn)聯(lián)合查詢,接下來通過本文給大家介紹MyBatis?多表聯(lián)合查詢及優(yōu)化,需要的朋友可以參考下
    2022-08-08
  • Java Apollo環(huán)境搭建以及集成SpringBoot案例詳解

    Java Apollo環(huán)境搭建以及集成SpringBoot案例詳解

    這篇文章主要介紹了Java Apollo環(huán)境搭建以及集成SpringBoot案例詳解,本篇文章通過簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下
    2021-08-08
  • 一文徹底了解Java的組合模式

    一文徹底了解Java的組合模式

    組合模式(Composite?Pattern)指將對(duì)象組合成樹形結(jié)構(gòu)以表示“部分-整體”的層次結(jié)構(gòu),?使得用戶對(duì)單個(gè)對(duì)象和組合對(duì)象的使用具有一致性。本文就來帶大家深入了解一下Java的組合模式吧
    2023-02-02
  • JDK8中的HashMap初始化和擴(kuò)容機(jī)制詳解

    JDK8中的HashMap初始化和擴(kuò)容機(jī)制詳解

    這篇文章主要介紹了JDK8中的HashMap初始化和擴(kuò)容機(jī)制,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • Java跨session實(shí)現(xiàn)token接口測(cè)試過程圖解

    Java跨session實(shí)現(xiàn)token接口測(cè)試過程圖解

    這篇文章主要介紹了Java跨session實(shí)現(xiàn)token接口測(cè)試過程圖解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-04-04
  • Java實(shí)現(xiàn)聯(lián)系人管理系統(tǒng)

    Java實(shí)現(xiàn)聯(lián)系人管理系統(tǒng)

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)聯(lián)系人管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-02-02
  • 從內(nèi)存模型中了解Java final的全部細(xì)節(jié)

    從內(nèi)存模型中了解Java final的全部細(xì)節(jié)

    關(guān)于final關(guān)鍵字,它也是我們一個(gè)經(jīng)常用的關(guān)鍵字,可以修飾在類上、或者修飾在變量、方法上,以此看來定義它的一些不可變性!像我們經(jīng)常使用的String類中,它便是final來修飾的類,并且它的字符數(shù)組也是被final所修飾的。但是一些final的一些細(xì)節(jié)你真的了解過嗎
    2022-03-03
  • SpringCloud項(xiàng)目中Feign組件添加請(qǐng)求頭所遇到的坑及解決

    SpringCloud項(xiàng)目中Feign組件添加請(qǐng)求頭所遇到的坑及解決

    這篇文章主要介紹了SpringCloud項(xiàng)目中Feign組件添加請(qǐng)求頭所遇到的坑及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-04-04
  • Spring SpringMVC在啟動(dòng)完成后執(zhí)行方法源碼解析

    Spring SpringMVC在啟動(dòng)完成后執(zhí)行方法源碼解析

    這篇文章主要介紹了SpringMVC在啟動(dòng)完成后執(zhí)行方法源碼解析,還是非常不錯(cuò)的,在這里分享給大家,需要的朋友可以參考下。
    2017-09-09

最新評(píng)論