亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Vertx基于EventBus發(fā)送接受自定義對(duì)象

 更新時(shí)間:2020年11月16日 11:26:50   作者:侯賽雷  
這篇文章主要介紹了Vertx基于EventBus發(fā)送接受自定義對(duì)象,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下

先看官方文檔步驟:

需要一個(gè)編解碼器,看源碼:

可見內(nèi)置了需要數(shù)據(jù)類型的實(shí)現(xiàn),所以發(fā)送其他消息可以發(fā)送,但是如果發(fā)送自定義對(duì)象就需要自己實(shí)現(xiàn)編解碼邏輯了

一 自定義編解碼器

/**
 * 自定義對(duì)象編解碼器,兩個(gè)類型可用于消息轉(zhuǎn)換,即發(fā)送對(duì)象轉(zhuǎn)換為接受需要的對(duì)象
 */
public class CustomizeMessageCodec implements MessageCodec<OrderMessage, OrderMessage> {
  /**
   * 將消息實(shí)體封裝到Buffer用于傳輸
   * 實(shí)現(xiàn)方式:使用對(duì)象流從對(duì)象中獲取Byte數(shù)組然后追加到Buffer
   */
  @Override
  public void encodeToWire(Buffer buffer, OrderMessage orderMessage) {
    final ByteArrayOutputStream b = new ByteArrayOutputStream();
    try (ObjectOutputStream o = new ObjectOutputStream(b)){
      o.writeObject(orderMessage);
      o.close();
      buffer.appendBytes(b.toByteArray());
    } catch (IOException e) { e.printStackTrace(); }
  }
  //從Buffer中獲取消息對(duì)象
  @Override
  public OrderMessage decodeFromWire(int pos, Buffer buffer) {
    final ByteArrayInputStream b = new ByteArrayInputStream(buffer.getBytes());
    OrderMessage msg = null;
    try (ObjectInputStream o = new ObjectInputStream(b)){ msg = (OrderMessage) o.readObject();
    } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); }
    return msg;
  }
  //消息轉(zhuǎn)換
  @Override
  public OrderMessage transform(OrderMessage orderMessage) {
    System.out.println("消息轉(zhuǎn)換---");//可對(duì)接受消息進(jìn)行轉(zhuǎn)換,比如轉(zhuǎn)換成另一個(gè)對(duì)象等
    orderMessage.setName("姚振");
    return orderMessage;
  }
  @Override
  public String name() { return "myCodec"; }
  //識(shí)別是否是用戶自定義編解碼器,通常為-1
  @Override
  public byte systemCodecID() { return -1; }
  public static MessageCodec create() {
    return new CustomizeMessageCodec();
  }
}

這里有一個(gè)點(diǎn)要注意,nam方法是必須的,且發(fā)送的時(shí)候一定要指明name

二 發(fā)送消息編寫

public class ProducerVerticle extends AbstractVerticle {
  @Override
  public void start() throws Exception {
    EventBus eventBus = vertx.eventBus();
    //發(fā)布消息(群發(fā))
    eventBus.publish("com.hou", "群發(fā)祝福!");
    //發(fā)送消息(單發(fā)),只會(huì)發(fā)送注冊(cè)此地址的一個(gè),采用不嚴(yán)格的輪詢算法選擇
    DeliveryOptions options = new DeliveryOptions();//設(shè)置消息頭等
    options.addHeader("some-header", "some-value");
    eventBus.send("com.hou", "單發(fā)消息",options,ar->{
      if(ar.succeeded()) System.out.println("收到消費(fèi)者確認(rèn)信息:"+ar.result().body());
    });
    //發(fā)送自定義對(duì)象,需要編解碼器
    eventBus.registerCodec(CustomizeMessageCodec.create());//注冊(cè)編碼器
    DeliveryOptions options1 = new DeliveryOptions().setCodecName("myCodec");//必須指定名字
    OrderMessage orderMessage = new OrderMessage();
    orderMessage.setName("侯征");
    eventBus.send("com.hou", orderMessage, options1);
  }
}

三 接受消息Verticle編寫

public class ConsumerVerticle extends AbstractVerticle {
  @Override
  public void start() throws Exception {
    //每個(gè)Vertx實(shí)例默認(rèn)是單例
    EventBus eb = vertx.eventBus();
    //注冊(cè)處理器,消費(fèi)com.hou發(fā)送的消息
    MessageConsumer<Object> consumer = eb.consumer("com.hou");//訂閱地址
    consumer.handler(message -> {//消息處理器
      if(message.body() instanceof OrderMessage){
        System.out.println("接受到對(duì)象: " + ((OrderMessage) message.body()).getName());
      }
      System.out.println("我是普通消費(fèi)者: " + message.body());
      message.reply("收到了!"); // 回復(fù)生產(chǎn)者,send才能接受
    }).completionHandler(res -> {//注冊(cè)完成后通知事件,適用于集群中比較慢的情況下
        System.out.println("注冊(cè)處理器結(jié)果"+res.succeeded());
    });
    //撤銷處理器
    //consumer.unregister();
  }
}

四 注冊(cè)部署Verticcle

vertx.deployVerticle(ConsumerVerticle.class.getName());
    TimeUnit.SECONDS.sleep(1);
    vertx.deployVerticle(ProducerVerticle.class.getName());

五 測(cè)試

以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

最新評(píng)論