亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

SpringAOP+RabbitMQ+WebSocket實戰(zhàn)詳解

 更新時間:2018年11月10日 08:43:19   作者:little-sheep  
這篇文章主要介紹了SpringAOP+RabbitMQ+WebSocket實戰(zhàn)詳解,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

背景

最近公司的客戶要求,分配給員工的任務除了有微信通知外,還希望PC端的網頁也能實時收到通知。管理員分配任務是在我們的系統(tǒng)A,而員工接受任務是在系統(tǒng)B。兩個系統(tǒng)都是現在已投入使用的系統(tǒng)。

技術選型

根據需求我們最終選用SpringAOP+RabbitMQ+WebSocket。

SpringAOP可以讓我們不修改原有代碼,直接將原有service作為切點,加入切面。RabbitMQ可以讓A系統(tǒng)和B系統(tǒng)解耦。WebSocket則可以達到實時通知的要求。

SpringAOP

AOP稱為面向切面編程,在程序開發(fā)中主要用來解決一些系統(tǒng)層面上的問題,比如日志,事務,權限等待。是Spring的核心模塊,底層是通過動態(tài)代理來實現(動態(tài)代理將在之后的文章重點介紹)。

基本概念

Aspect(切面):通常是一個類,里面可以定義切入點和通知。

JointPoint(連接點):程序執(zhí)行過程中明確的點,一般是方法的調用。

Advice(通知):AOP在特定的切入點上執(zhí)行的增強處理,有before,after,afterReturning,afterThrowing,around。

Pointcut(切入點):就是帶有通知的連接點,在程序中主要體現為書寫切入點表達式。

通知類型

Before:在目標方法被調用之前做增強處理。

@Before只需要指定切入點表達式即可

AfterReturning:在目標方法正常完成后做增強。

@AfterReturning除了指定切入點表達式后,還可以指定一個返回值形參名returning,代表目標方法的返回值

AfterThrowing:主要用來處理程序中未處理的異常。

@AfterThrowing除了指定切入點表達式后,還可以指定一個throwing的返回值形參名,可以通過該形參名

來訪問目標方法中所拋出的異常對象

After:在目標方法完成之后做增強,無論目標方法時候成功完成。

@After可以指定一個切入點表達式

Around:環(huán)繞通知,在目標方法完成前后做增強處理,環(huán)繞通知是最重要的通知類型,像事務,日志等都是環(huán)繞通知,注意編程中核心是一個ProceedingJoinPoint。

RabbitMQ

從圖中我們可以看到RabbitMQ主要的結構有:Routing、Binding、Exchange、Queue。

Queue

Queue(隊列)RabbitMQ的作用是存儲消息,隊列的特性是先進先出。

Exchange

生產者產生的消息并不是直接發(fā)送給消息隊列Queue的,而是要經過Exchange(交換器),由Exchange再將消息路由到一個或多個Queue,還會將不符合路由規(guī)則的消息丟棄。

Routing

用于標記或生產者尋找Exchange。

Binding

用于Exchange和Queue做關聯。

Exchange Type fanout

fanout類型的Exchange路由規(guī)則非常簡單,它會把所有發(fā)送到該Exchange的消息路由到所有與它綁定的Queue中。

direct

direct會把消息路由到那些binding key與routing key完全匹配的Queue中。

topic

direct規(guī)則是嚴格意義上的匹配,換言之Routing Key必須與Binding Key相匹配的時候才將消息傳送給Queue,那么topic這個規(guī)則就是模糊匹配,可以通過通配符滿足一部分規(guī)則就可以傳送。

headers

headers類型的Exchange不依賴于routing key與binding key的匹配規(guī)則來路由消息,而是根據發(fā)送的消息內容中的headers屬性進行匹配。

WebSocket

了解websocket必須先知道幾個常用的web通信技術及其區(qū)別。

短輪詢

短輪詢的基本思路就是瀏覽器每隔一段時間向瀏覽器發(fā)送http請求,服務器端在收到請求后,不論是否有數據更新,都直接進行響應。這種方式實現的即時通信,本質上還是瀏覽器發(fā)送請求,服務器接受請求的一個過程,通過讓客戶端不斷的進行請求,使得客戶端能夠模擬實時地收到服務器端的數據的變化。

這種方式的優(yōu)點是比較簡單,易于理解,實現起來也沒有什么技術難點。缺點是顯而易見的,這種方式由于需要不斷的建立http連接,嚴重浪費了服務器端和客戶端的資源。尤其是在客戶端,距離來說,如果有數量級想對比較大的人同時位于基于短輪詢的應用中,那么每一個用戶的客戶端都會瘋狂的向服務器端發(fā)送http請求,而且不會間斷。人數越多,服務器端壓力越大,這是很不合理的。

因此短輪詢不適用于那些同時在線用戶數量比較大,并且很注重性能的Web應用。

長輪詢/ comet

comet指的是,當服務器收到客戶端發(fā)來的請求后,不會直接進行響應,而是先將這個請求掛起,然后判斷服務器端數據是否有更新。如果有更新,則進行響應,如果一直沒有數據,則到達一定的時間限制(服務器端設置)后關閉連接。

長輪詢和短輪詢比起來,明顯減少了很多不必要的http請求次數,相比之下節(jié)約了資源。長輪詢的缺點在于,連接掛起也會導致資源的浪費。

SSE

SSE是HTML5新增的功能,全稱為Server-Sent Events。它可以允許服務推送數據到客戶端。SSE在本質上就與之前的長輪詢、短輪詢不同,雖然都是基于http協(xié)議的,但是輪詢需要客戶端先發(fā)送請求。而SSE最大的特點就是不需要客戶端發(fā)送請求,可以實現只要服務器端數據有更新,就可以馬上發(fā)送到客戶端。

SSE的優(yōu)勢很明顯,它不需要建立或保持大量的客戶端發(fā)往服務器端的請求,節(jié)約了很多資源,提升應用性能。并且SSE的實現非常簡單,不需要依賴其他插件。

WebSocket

WebSocket是Html5定義的一個新協(xié)議,與傳統(tǒng)的http協(xié)議不同,該協(xié)議可以實現服務器與客戶端之間全雙工通信。簡單來說,首先需要在客戶端和服務器端建立起一個連接,這部分需要http。連接一旦建立,客戶端和服務器端就處于平等的地位,可以相互發(fā)送數據,不存在請求和響應的區(qū)別。

WebSocket的優(yōu)點是實現了雙向通信,缺點是服務器端的邏輯非常復雜?,F在針對不同的后臺語言有不同的插件可以使用。

四種Web即時通信技術比較

從兼容性角度考慮,短輪詢>長輪詢>長連接SSE>WebSocket;

從性能方面考慮,WebSocket>長連接SSE>長輪詢>短輪詢。

實戰(zhàn)

項目使用SpringBoot搭建。RabbitMQ的安裝這里不講述。

RabbitMQ配置

兩個系統(tǒng)A、B都需要操作RabbitMQ,其中A生產消息,B消費消息。故都需要配置。

1、首先引入RabbitMQ的dependency:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

這個dependency中包含了RabbitMQ相關dependency。

2、在項目的配置文件里配置為使用rabbitmq及其參數。

application-pro.yml

#消息隊列
message.queue.type: rabbitmq
## rabbit mq properties
rabbitmq:
 host: localhost
 port: 5672
 username: guest
 password: guest

application.properties

#將要使用的隊列名
rabbitmq.websocket.msg.queue=websocket_msg_queue

3、創(chuàng)建配置文件。隊列的創(chuàng)建交給spring。

RabbitMQConfig.java

@Configuration
@EnableRabbit
public class RabbitMQConfig {

  @Value("${rabbitmq.host}")
  private String host;
  @Value("${rabbitmq.port}")
  private String port;
  @Value("${rabbitmq.username}")
  private String username;
  @Value("${rabbitmq.password}")
  private String password;
  @Value("${rabbitmq.websocket.msg.queue}")
  private String webSocketMsgQueue;

  @Bean
  public ConnectionFactory connectionFactory() throws IOException {
    CachingConnectionFactory factory = new CachingConnectionFactory();
    factory.setUsername(username);
    factory.setPassword(password);
//    factory.setVirtualHost("test");
    factory.setHost(host);
    factory.setPort(Integer.valueOf(port));
    factory.setPublisherConfirms(true);

    //設置隊列參數,是否持久化、隊列TTL、隊列消息TTL等
    factory.createConnection().createChannel(false).queueDeclare(webSocketMsgQueue, true, false, false, null);
    return factory;
  }

  @Bean
  public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
  }

  @Bean
  @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  // 必須是prototype類型
  public RabbitTemplate rabbitTemplate() throws IOException {
    return new RabbitTemplate(connectionFactory());
  }

  @Bean
  public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() throws IOException {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrentConsumers(3);
    factory.setMaxConcurrentConsumers(10);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return factory;
  }
}

4、系統(tǒng)B中創(chuàng)建隊列監(jiān)聽,當隊列有消息時,發(fā)送websocket通知。

RabbitMQListener.java

@Component
public class RabbitMQListener {

  @Autowired
  private RabbitMQService mqService;

  /**
   * WebSocket推送監(jiān)聽器
   * @param socketEntity
   * @param deliveryTag
   * @param channel
   */
  @RabbitListener(queues = "websocket_msg_queue")
  public void webSocketMsgListener(@Payload WebSocketMsgEntity socketMsgEntity, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
    mqService.handleWebSocketMsg(socketMsgEntity, deliveryTag, channel);
  }

}

RabbitMQService.java

public class RabbitMQService {
  @Autowired
  private MessageWebSocketHandler messageWebSocketHandler;

  /**
   * @param socketMsgEntity
   * @param deliveryTag
   * @param channel
   * @throws IOException
   */
  void handleWebSocketMsg(WebSocketMsgEntity socketMsgEntity, long deliveryTag, Channel channel) throws IOException {
    try {
      messageWebSocketHandler.sendMessageToUsers(socketMsgEntity.toJsonString(), socketMsgEntity.getToUserIds());
      channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
      channel.basicNack(deliveryTag, false, false);
    }
  }
}

WebSocketMsgEntity為MQ中傳送的實體。

public class WebSocketMsgEntity implements Serializable {
  public enum OrderType{
    repair("維修"),
    maintain("保養(yǎng)"),
    measure("計量");

    OrderType(String value){
      this.value = value;
    }
    String value;

    public String getValue() {
      return value;
    }
  }
  //設備名稱
  private String EquName;
  //設備編號
  private String EquId;
  //工單類型
  private OrderType orderType;
  //工單單號
  private String orderId;
  //工單狀態(tài)
  private String orderStatus;
  //創(chuàng)建時間
  private Date createTime;
  //消息接收人ID
  private List<String> toUserIds;

  public String getEquName() {
    return EquName;
  }

  public void setEquName(String equName) {
    EquName = equName;
  }

  public String getOrderId() {
    return orderId;
  }

  public void setOrderId(String orderId) {
    this.orderId = orderId;
  }

  public String getEquId() {
    return EquId;
  }

  public void setEquId(String equId) {
    EquId = equId;
  }

  public String getOrderStatus() {
    return orderStatus;
  }

  public void setOrderStatus(String orderStatus) {
    this.orderStatus = orderStatus;
  }


  public OrderType getOrderType() {
    return orderType;
  }

  public void setOrderType(OrderType orderType) {
    this.orderType = orderType;
  }

  public Date getCreateTime() {
    return createTime;
  }

  public void setCreateTime(Date createTime) {
    this.createTime = createTime;
  }

  public List<String> getToUserIds() {
    return toUserIds;
  }

  public void setToUserIds(List<String> toUserIds) {
    this.toUserIds = toUserIds;
  }

  public String toJsonString(){
    return JSON.toJSONString(this);
  }
}

SpringAOP

1、系統(tǒng)A中創(chuàng)建一個切面類DataInterceptor.java

@Aspect
@Component
public class DataInterceptor {
  @Autowired
  private MessageQueueService queueService;


  //維修工單切點
  @Pointcut("execution(* com.zhishang.hes.common.service.impl.RepairServiceImpl.executeFlow(..))")
  private void repairMsg() {
  }

  /**
   * 返回通知,方法執(zhí)行正常返回時觸發(fā)
   *
   * @param joinPoint
   * @param result
   */
  @AfterReturning(value = "repairMsg()", returning = "result")
  public void afterReturning(JoinPoint joinPoint, Object result) {
    //此處可以獲得切點方法名
    //String methodName = joinPoint.getSignature().getName();
    EquipmentRepair equipmentRepair = (EquipmentRepair) result;
    WebSocketMsgEntity webSocketMsgEntity = this.generateRepairMsgEntity(equipmentRepair);
    if (webSocketMsgEntity == null) {
      return;
    }
    queueService.send(webSocketMsgEntity);
  }

  /**
   * 生成發(fā)送到MQ的維修消息
   *
   * @param equipmentRepair
   * @return
   */
  private WebSocketMsgEntity generateRepairMsgEntity(EquipmentRepair equipmentRepair) {
    WebSocketMsgEntity webSocketMsgEntity = generateRepairMsgFromTasks(equipmentRepair);
    return webSocketMsgEntity;
  }

  /**
   * 從任務中生成消息
   *
   * @param equipmentRepair
   * @return
   */
  private WebSocketMsgEntity generateRepairMsgFromTasks(EquipmentRepair equipmentRepair) {
    //業(yè)務代碼略
  }

}

2、發(fā)送消息到MQ。這里只貼了發(fā)送的核心代碼

public class RabbitMessageQueue extends AbstractMessageQueue {

  @Value("${rabbitmq.websocket.msg.queue}")
  private String webSocketMsgQueue;

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @Override
  public void send(WebSocketMsgEntity entity) {
    //沒有指定exchange,則使用默認名為“”的exchange,binding名與queue名相同
    rabbitTemplate.convertAndSend(webSocketMsgQueue, entity);
  }
}

WebSocket

1、 系統(tǒng)B中引入websocket服務端dependency

<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-websocket</artifactId>
  <version>4.3.10.RELEASE</version>
</dependency>

2、 配置websocket,添加處理類

WebSocketConfigurer.java

@Configuration
@EnableWebSocket
public class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {

  private static Logger logger = LoggerFactory.getLogger(WebSocketConfig.class);

  @Override
  public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
    //配置webSocket路徑
    registry.addHandler(messageWebSocketHandler(),"/msg-websocket").addInterceptors(new MyHandshakeInterceptor()).setAllowedOrigins("*");
    //配置webSocket路徑 支持前端使用socketJs
    registry.addHandler(messageWebSocketHandler(), "/sockjs/msg-websocket").setAllowedOrigins("*").addInterceptors(new MyHandshakeInterceptor()).withSockJS();
  }

  @Bean
  public MessageWebSocketHandler messageWebSocketHandler() {
    logger.info("......創(chuàng)建MessageWebSocketHandler......");
    return new MessageWebSocketHandler();
  }

}

MessageWebSocketHandler.java 主要用于websocket連接及消息發(fā)送處理。配置中還使用了連接握手時的處理,主要是取用戶登陸信息,這里不多講述。

public class MessageWebSocketHandler extends TextWebSocketHandler {
  private static Logger logger = LoggerFactory.getLogger(SystemWebSocketHandler.class);
  private static ConcurrentHashMap<String, CopyOnWriteArraySet<WebSocketSession>> users = new ConcurrentHashMap<>();

  @Override
  public void afterConnectionEstablished(WebSocketSession session) throws Exception {
    String userId = session.getAttributes().get("WEBSOCKET_USERID").toString();
    logger.info("......AfterConnectionEstablished......");
    logger.info("session.getId:" + session.getId());
    logger.info("session.getLocalAddress:" + session.getLocalAddress().toString());
    logger.info("userId:" + userId);
    //websocket連接后記錄連接信息
    if (users.keySet().contains(userId)) {
      CopyOnWriteArraySet<WebSocketSession> webSocketSessions = users.get(userId);
      webSocketSessions.add(session);
    } else {
      CopyOnWriteArraySet<WebSocketSession> webSocketSessions = new CopyOnWriteArraySet<>();
      webSocketSessions.add(session);
      users.put(userId, webSocketSessions);
    }
  }

  @Override
  public void handleTransportError(WebSocketSession session, Throwable throwable) throws Exception {
    removeUserSession(session);
    if (session.isOpen()) {
      session.close();
    }
    logger.info("異常出現handleTransportError" + throwable.getMessage());
  }

  @Override
  public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
    removeUserSession(session);
    logger.info("關閉afterConnectionClosed" + closeStatus.getReason());
  }

  @Override
  public boolean supportsPartialMessages() {
    return false;
  }

  /**
   * 給符合要求的在線用戶發(fā)送消息
   *
   * @param message
   */
  public void sendMessageToUsers(String message, List<String> userIds) throws IOException{
    if (StringUtils.isEmpty(message) || CollectionUtils.isEmpty(userIds)) {
      return;
    }
    if (users.isEmpty()) {
      return;
    }
    for (String userId : userIds) {
      if (!users.keySet().contains(userId)) {
        continue;
      }
      CopyOnWriteArraySet<WebSocketSession> webSocketSessions = users.get(userId);
      if (webSocketSessions == null) {
        continue;
      }
      for (WebSocketSession webSocketSession : webSocketSessions) {
        if (webSocketSession.isOpen()) {
          try {
            webSocketSession.sendMessage(new TextMessage(message));
          } catch (IOException e) {
            logger.error(" WebSocket server send message ERROR " + e.getMessage());
            try {
              throw e;
            } catch (IOException e1) {
              e1.printStackTrace();
            }
          }
        }
      }
    }
  }

  /**
   * websocket清除連接信息
   *
   * @param session
   */
  private void removeUserSession(WebSocketSession session) {
    String userId = session.getAttributes().get("WEBSOCKET_USERID").toString();
    if (users.keySet().contains(userId)) {
      CopyOnWriteArraySet<WebSocketSession> webSocketSessions = users.get(userId);
      webSocketSessions.remove(session);
      if (webSocketSessions.isEmpty()) {
        users.remove(userId);
      }
    }
  }
}

整個功能完成后,A系統(tǒng)分配任務時,系統(tǒng)B登陸用戶收到的消息如圖:

總體流程:

1、對于系統(tǒng)B,每個登陸的用戶都會和服務器建立websocket長連接。

2、系統(tǒng)A生成任務,AOP做出響應,將封裝的消息發(fā)送給MQ。

3、系統(tǒng)B中的MQ監(jiān)聽發(fā)現隊列有消息到達,消費消息。

4、系統(tǒng)B通過websocket長連接將消息發(fā)給指定的登陸用戶。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

相關文章

  • 通過Java實現文件斷點續(xù)傳功能

    通過Java實現文件斷點續(xù)傳功能

    用戶上傳大文件,網絡差點的需要歷時數小時,萬一線路中斷,不具備斷點續(xù)傳的服務器就只能從頭重傳,而斷點續(xù)傳就是,允許用戶從上傳斷線的地方繼續(xù)傳送,這樣大大減少了用戶的煩惱。本文將用Java語言實現斷點續(xù)傳,需要的可以參考一下
    2022-05-05
  • Java數據脫敏實現的方法總結

    Java數據脫敏實現的方法總結

    數據脫敏,指的是對某些敏感信息通過脫敏規(guī)則進行數據的變形,實現敏感隱私數據的可靠保護,本文主要是對后端數據脫敏實現的簡單總結,希望對大家有所幫助
    2023-07-07
  • Java中自然排序和比較器排序詳解

    Java中自然排序和比較器排序詳解

    這篇文章給大家介紹Java中的排序并不是指插入排序、希爾排序、歸并排序等具體的排序算法。而是自然排序和比較器排序,文中通過實例代碼介紹的很詳細,有需要的朋友們可以參考借鑒。
    2016-09-09
  • SpringBoot找不到映射文件的處理方式

    SpringBoot找不到映射文件的處理方式

    這篇文章主要介紹了SpringBoot找不到映射文件的處理方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-10-10
  • Java利用redis zset實現延時任務詳解

    Java利用redis zset實現延時任務詳解

    zset作為redis的有序集合數據結構存在,排序的依據就是score。本文就將利用zset score這個排序的這個特性,來實現延時任務,感興趣的可以了解一下
    2022-08-08
  • Java線程中的Timer和TimerTask原理詳解

    Java線程中的Timer和TimerTask原理詳解

    這篇文章主要介紹了Java線程中的Timer和TimerTask原理詳解,Timer和TimerTask成對出現,Timer是定時器,TimerTask是定時任務,換句話說,定時任務TimerTask是給定時器Timer執(zhí)行的具體任務,需要的朋友可以參考下
    2023-10-10
  • java開發(fā)之SQL語句中DATE_FORMAT函數舉例詳解

    java開發(fā)之SQL語句中DATE_FORMAT函數舉例詳解

    要將日期值格式化為特定格式,請使用DATE_FORMAT函數,下面這篇文章主要給大家介紹了關于java開發(fā)之SQL語句中DATE_FORMAT函數的相關資料,文中通過代碼介紹的非常詳細,需要的朋友可以參考下
    2024-05-05
  • java書店系統(tǒng)畢業(yè)設計 用戶模塊(3)

    java書店系統(tǒng)畢業(yè)設計 用戶模塊(3)

    這篇文章主要介紹了java書店系統(tǒng)畢業(yè)設計,第三步系統(tǒng)總體設計,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2016-10-10
  • Java字節(jié)與字符流永久存儲json數據

    Java字節(jié)與字符流永久存儲json數據

    本篇文章給大家詳細講述了Java字節(jié)與字符流永久存儲json數據的方法,以及代碼分享,有興趣的參考學習下。
    2018-02-02
  • SpringBoot2.X Kotlin系列之數據校驗和異常處理詳解

    SpringBoot2.X Kotlin系列之數據校驗和異常處理詳解

    這篇文章主要介紹了SpringBoot 2.X Kotlin系列之數據校驗和異常處理詳解,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-04-04

最新評論