亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Spring Cloud Stream如何實現(xiàn)服務之間的通訊

 更新時間:2019年10月15日 11:37:59   作者:維晟  
這篇文章主要介紹了Spring Cloud Stream如何實現(xiàn)服務之間的通訊,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下

Spring Cloud Stream

Srping cloud Bus的底層實現(xiàn)就是Spring Cloud Stream,Spring Cloud Stream的目的是用于構建基于消息驅動(或事件驅動)的微服務架構。Spring Cloud Stream本身對Spring Messaging、Spring Integration、Spring Boot Actuator、Spring Boot Externalized Configuration等模塊進行封裝(整合)和擴展,下面我們實現(xiàn)兩個服務之間的通訊來演示Spring Cloud Stream的使用方法。

整體概述

服務要想與其他服務通訊要定義通道,一般會定義輸出通道和輸入通道,輸出通道用于發(fā)送消息,輸入通道用于接收消息,每個通道都會有個名字(輸入和輸出只是通道類型,可以用不同的名字定義很多很多通道),不同通道的名字不能相同否則會報錯(輸入通道和輸出通道不同類型的通道名稱也不能相同),綁定器是操作RabbitMQ或Kafka的抽象層,為了屏蔽操作這些消息中間件的復雜性和不一致性,綁定器會用通道的名字在消息中間件中定義主題,一個主題內的消息生產者來自多個服務,一個主題內消息的消費者也是多個服務,也就是說消息的發(fā)布和消費是通過主題進行定義和組織的,通道的名字就是主題的名字,在RabbitMQ中主題使用Exchanges實現(xiàn),在Kafka中主題使用Topic實現(xiàn)。

準備環(huán)境

創(chuàng)建兩個項目spring-cloud-stream-a和spring-cloud-stream-b,spring-cloud-stream-a我們用Spring Cloud Stream實現(xiàn)通訊,spring-cloud-stream-b我們用Spring Cloud Stream的底層模塊Spring Integration實現(xiàn)通訊。

兩個項目的POM文件依賴都是:

<dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-test-support</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

spring-cloud-stream-binder-rabbit是指綁定器的實現(xiàn)使用RabbitMQ。

項目配置內容application.properties:

spring.application.name=spring-cloud-stream-a
server.port=9010

#設置默認綁定器
spring.cloud.stream.defaultBinder = rabbit

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.application.name=spring-cloud-stream-b
server.port=9011

#設置默認綁定器
spring.cloud.stream.defaultBinder = rabbit

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

啟動一個rabbitmq:

docker pull rabbitmq:3-management
docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

編寫A項目代碼

在A項目中定義一個輸入通道一個輸出通道,定義通道在接口中使用@Input和@Output注解定義,程序啟動的時候Spring Cloud Stream會根據(jù)接口定義將實現(xiàn)類自動注入(Spring Cloud Stream自動實現(xiàn)該接口不需要寫代碼)。

A服務輸入通道,通道名稱ChatExchanges.A.Input,接口定義輸入通道必須返回SubscribableChannel:

public interface ChatInput {
  String INPUT = "ChatExchanges.A.Input";
  @Input(ChatInput.INPUT)
  SubscribableChannel input();
}

A服務輸出通道,通道名稱ChatExchanges.A.Output,輸出通道必須返回MessageChannel:

public interface ChatOutput {

  String OUTPUT = "ChatExchanges.A.Output";

  @Output(ChatOutput.OUTPUT)
  MessageChannel output();
}

定義消息實體類:

public class ChatMessage implements Serializable {

  private String name;
  private String message;
  private Date chatDate;

  //沒有無參數(shù)的構造函數(shù)并行化會出錯
  private ChatMessage(){}

  public ChatMessage(String name,String message,Date chatDate){
    this.name = name;
    this.message = message;
    this.chatDate = chatDate;
  }

  public String getName(){
    return this.name;
  }

  public String getMessage(){
    return this.message;
  }

  public Date getChatDate() { return this.chatDate; }

  public String ShowMessage(){
    return String.format("聊天消息:%s的時候,%s說%s。",this.chatDate,this.name,this.message);
  }
}

在業(yè)務處理類上用@EnableBinding注解綁定輸入通道和輸出通道,這個綁定動作其實就是創(chuàng)建并注冊輸入和輸出通道的實現(xiàn)類到Bean中,所以可以直接是使用@Autowired進行注入使用,另外消息的串行化默認使用application/json格式(com.fastexml.jackson),最后用@StreamListener注解進行指定通道消息的監(jiān)聽:

//ChatInput.class的輸入通道不在這里綁定,監(jiān)聽到數(shù)據(jù)會找不到AClient類的引用。
//Input和Output通道定義的名字不能一樣,否則程序啟動會拋異常。
@EnableBinding({ChatOutput.class,ChatInput.class})
public class AClient {

  private static Logger logger = LoggerFactory.getLogger(AClient.class);

  @Autowired
  private ChatOutput chatOutput;

  //StreamListener自帶了Json轉對象的能力,收到B的消息打印并回復B一個新的消息。
  @StreamListener(ChatInput.INPUT)
  public void PrintInput(ChatMessage message) {

    logger.info(message.ShowMessage());

    ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date());

    chatOutput.output().send(MessageBuilder.withPayload(replyMessage).build());
  }
}

到此A項目代碼編寫完成。

編寫B(tài)項目代碼

B項目使用Spring Integration實現(xiàn)消息的發(fā)布和消費,定義通道時我們要交換輸入通道和輸出通道的名稱:

public interface ChatProcessor {

  String OUTPUT = "ChatExchanges.A.Input";
  String INPUT = "ChatExchanges.A.Output";

  @Input(ChatProcessor.INPUT)
  SubscribableChannel input();

  @Output(ChatProcessor.OUTPUT)
  MessageChannel output();
}

消息實體類:

public class ChatMessage {
  private String name;
  private String message;
  private Date chatDate;

  //沒有無參數(shù)的構造函數(shù)并行化會出錯
  private ChatMessage(){}

  public ChatMessage(String name,String message,Date chatDate){
    this.name = name;
    this.message = message;
    this.chatDate = chatDate;
  }

  public String getName(){
    return this.name;
  }

  public String getMessage(){
    return this.message;
  }

  public Date getChatDate() { return this.chatDate; }

  public String ShowMessage(){
    return String.format("聊天消息:%s的時候,%s說%s。",this.chatDate,this.name,this.message);
  }
}

業(yè)務處理類用@ServiceActivator注解代替@StreamListener,用@InboundChannelAdapter注解發(fā)布消息:

@EnableBinding(ChatProcessor.class)
public class BClient {

  private static Logger logger = LoggerFactory.getLogger(BClient.class);

  //@ServiceActivator沒有Json轉對象的能力需要借助@Transformer注解
  @ServiceActivator(inputChannel=ChatProcessor.INPUT)
  public void PrintInput(ChatMessage message) {

    logger.info(message.ShowMessage());
  }

  @Transformer(inputChannel = ChatProcessor.INPUT,outputChannel = ChatProcessor.INPUT)
  public ChatMessage transform(String message) throws Exception{
    ObjectMapper objectMapper = new ObjectMapper();
    return objectMapper.readValue(message,ChatMessage.class);
  }

  //每秒發(fā)出一個消息給A
  @Bean
  @InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000"))
  public GenericMessage<ChatMessage> SendChatMessage(){
    ChatMessage message = new ChatMessage("ClientB","B To A Message.", new Date());
    GenericMessage<ChatMessage> gm = new GenericMessage<>(message);
    return gm;
  }
}

運行程序

啟動A項目和B項目:


源碼

Github倉庫:https://github.com/sunweisheng/spring-cloud-example

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

相關文章

  • Java 運算符 動力節(jié)點Java學院整理

    Java 運算符 動力節(jié)點Java學院整理

    這篇文章主要介紹了Java 運算符 動力節(jié)點Java學院整理,需要的朋友可以參考下
    2017-04-04
  • MybatisPlus的MetaObjectHandler與@TableLogic使用

    MybatisPlus的MetaObjectHandler與@TableLogic使用

    這篇文章主要介紹了MybatisPlus的MetaObjectHandler與@TableLogic使用方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-04-04
  • 詳解Spring Boot 項目啟動時執(zhí)行特定方法

    詳解Spring Boot 項目啟動時執(zhí)行特定方法

    這篇文章主要介紹了詳解Spring Boot 項目啟動時執(zhí)行特定方法,Springboot給我們提供了兩種“開機啟動”某些方法的方式:ApplicationRunner和CommandLineRunner。感興趣的小伙伴們可以參考一下
    2018-06-06
  • SpringBoot獲取配置文件內容的幾種方式總結

    SpringBoot獲取配置文件內容的幾種方式總結

    大家都知道SpringBoot獲取配置文件的方法有很多,下面這篇文章主要給大家介紹了關于SpringBoot獲取配置文件內容的幾種方式,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
    2023-02-02
  • java編程實現(xiàn)求解八枚銀幣代碼分享

    java編程實現(xiàn)求解八枚銀幣代碼分享

    這篇文章主要介紹了java編程實現(xiàn)求解八枚銀幣代碼分享,具有一定參考價值,需要的朋友可以了解下。
    2017-11-11
  • SpringCloud微服務開發(fā)基于RocketMQ實現(xiàn)分布式事務管理詳解

    SpringCloud微服務開發(fā)基于RocketMQ實現(xiàn)分布式事務管理詳解

    分布式事務是在微服務開發(fā)中經常會遇到的一個問題,之前的文章中我們已經實現(xiàn)了利用Seata來實現(xiàn)強一致性事務,其實還有一種廣為人知的方案就是利用消息隊列來實現(xiàn)分布式事務,保證數(shù)據(jù)的最終一致性,也就是我們常說的柔性事務
    2022-09-09
  • 你什么是Elastic Stack(ELK)

    你什么是Elastic Stack(ELK)

    這篇文章主要介紹了你什么是Elastic Stack(ELK),ELK是三款軟件的簡稱,分別是Elasticsearch、Logstash、Kibana組成,需要的朋友可以參考下
    2023-04-04
  • SpringMvc @Valid如何拋出攔截異常

    SpringMvc @Valid如何拋出攔截異常

    這篇文章主要介紹了SpringMvc @Valid如何拋出攔截異常,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-09-09
  • java?jar包后臺運行的兩種方式詳解

    java?jar包后臺運行的兩種方式詳解

    后臺運行jar的方法有多種方法可以實現(xiàn)Java后臺運行jar文件,下面介紹其中兩種常見的方法,下面這篇文章主要給大家介紹了關于java?jar包后臺運行的兩種方式,文中通過代碼介紹的非常詳細,需要的朋友可以參考下
    2024-07-07
  • java String[]字符串數(shù)組自動排序的簡單實現(xiàn)

    java String[]字符串數(shù)組自動排序的簡單實現(xiàn)

    下面小編就為大家?guī)硪黄猨ava String[]字符串數(shù)組自動排序的簡單實現(xiàn)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2016-09-09

最新評論