最新SpringCloud?Stream消息驅(qū)動講解
SpringCloud Stream消息驅(qū)動
1、SpringCloud Stream概述
官方地址:https://spring.io/projects/spring-cloud-stream#overview
中文指導(dǎo)手冊地址:https://m.wang1314.com/doc/webapp/topic/20971999.html
SpringCloud Stream 是一個(gè)構(gòu)建消息驅(qū)動微服務(wù)的框架
應(yīng)用程序通過 outputs 或 inputs 來與 SpringCloud Stream 中的 binder 對象交互
SpringCloud Stream 中的 binder 對象負(fù)責(zé)與消息中間件交互
通過 SpringCloud Stream 連接消息中間件,以實(shí)現(xiàn)消息事件驅(qū)動
什么是SpringCloudStream官方定義 Spring Cloud Stream 是一個(gè)構(gòu)建消息驅(qū)動微服務(wù)的框架。
應(yīng)用程序通過 inputs 或者 outputs 來與 Spring Cloud Stream中binder對象交互。通過我們配置binding(綁定) ,而 Spring Cloud Stream 的 binder對象負(fù)責(zé)與消息中間件交互。所以,我們只需要搞清楚如何與 Spring Cloud Stream 交互就可以方便使用消息驅(qū)動的方式。
通過使用Spring Integration來連接消息代理中間件以實(shí)現(xiàn)消息事件驅(qū)動。Spring Cloud Stream 為一些供應(yīng)商的消息中間件產(chǎn)品提供了個(gè)性化的自動化配置實(shí)現(xiàn),引用了發(fā)布-訂閱、消費(fèi)組、分區(qū)的三個(gè)核心概念。
目前僅支持RabbitMQ、Kafka。
1.1、設(shè)計(jì)思想
1、標(biāo)注的MQ流程
生產(chǎn)者/消費(fèi)者之間靠消息媒介傳遞信息內(nèi)容【massage】
消息必須走特定的通道【消息通道MessageChannel】
消息通道里的消息如何被消費(fèi)呢,誰負(fù)責(zé)收發(fā)處理
消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息處理器所訂閱
2、Cloud Stream的作用
比方說我們用到了RabbitMQ和Kafka,由于這兩個(gè)消息中間件的架構(gòu)上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分區(qū)。
這些中間件的差異性導(dǎo)致我們實(shí)際項(xiàng)目開發(fā)給我們造成了一定的困擾,我們?nèi)绻昧藘蓚€(gè)消息隊(duì)列的其中一種,后面的業(yè)務(wù)需求,我想往另外一種消息隊(duì)列進(jìn)行遷移,這時(shí)候無疑就是一個(gè)災(zāi)難性的,一大堆東西都要重新推倒重新做,因?yàn)樗覀兊南到y(tǒng)耦合了,這時(shí)候springcloud Stream給我們提供了一種解耦合的方式。
3、什么是Binder
在沒有綁定器這個(gè)概念的情況下,我們的SpringBoot應(yīng)用要直接與消息中間件進(jìn)行信息交互的時(shí)候,由于各消息中間件構(gòu)建的初衷不同,它們的實(shí)現(xiàn)細(xì)節(jié)上會有較大的差異性通過定義綁定器作為中間層,完美地實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離。
通過向應(yīng)用程序暴露統(tǒng)一的Channel通道,使得應(yīng)用程序不需要再考慮各種不同的消息中間件實(shí)現(xiàn)。
Binder可以生成Binding,Binding用來綁定消息容器的生產(chǎn)者和消費(fèi)者,它有兩種類型,INPUT和OUTPUT,INPUT對應(yīng)于消費(fèi)者,OUTPUT對應(yīng)于生產(chǎn)者。 4、Stream中的消息通信方式遵循了發(fā)布-訂閱模式
使用Topic主題進(jìn)行廣播
- 在RabbitMQ就是Exchange
- 在Kakfa中就是Topic
1.2、標(biāo)準(zhǔn)的流程套路
1、Binder:很方便的連接中間件,屏蔽不同的差異
2、Channel
通道,是隊(duì)列Queue的一種抽象,在消息通訊系統(tǒng)中就是實(shí)現(xiàn)存儲和轉(zhuǎn)發(fā)的媒介,通過Channel對隊(duì)列進(jìn)行配置
3、Source和Sink
簡單的可理解為參照對象是Spring Cloud Stream自身,從Stream發(fā)布消息就是輸出,接受消息就是輸入。
1.3、編碼API和常用注解
組成和注解 | 描述 |
---|---|
Middleware | 中間件,目前只支持RabbitM和Kafka |
Binder | Binder是應(yīng)用與消息中間的封裝,目前實(shí)現(xiàn)了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態(tài)的改變消息類型(對應(yīng)Kafka的topic,RabbitMQ的exchange),這些都可以通過配置文件來實(shí)現(xiàn) |
@Input | 注解標(biāo)識輸入通道,通過該輸入通道接收到的消息進(jìn)入應(yīng)用程序 |
@Output | 注解標(biāo)識輸出通道,發(fā)布的消息將通過通道離開應(yīng)用程序 |
@StreamListener | 監(jiān)聽隊(duì)列,用戶消費(fèi)者的隊(duì)列的消息接收 |
@EnableBinding | 指通道channel和exchange綁定在一起 |
2、消息驅(qū)動之生產(chǎn)者(output)
2.1、新建模塊cloud-stream-rabbitmq-provider8801
2.2、引入pom.xml配置文件
如果是需要Stream整合的就將依賴改為
spring-cloud-starter-stream-kafka
<dependencies> <!--stream整合rabbit依賴--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--基礎(chǔ)配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2.3、YAML配置文件
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息; defaultRabbit: # 表示定義的名稱,用于于binding整合 type: rabbit # 消息組件類型 environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務(wù)的整合處理 output: # 這個(gè)名字是一個(gè)通道的名稱,消息生產(chǎn)者 destination: studyExchange # 表示要使用的Exchange名稱定義【自定義】 content-type: application/json # 設(shè)置消息類型,本次為json,文本則設(shè)置“text/plain” binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置【上面的配置】 eureka: client: # 客戶端進(jìn)行Eureka注冊的配置 service-url: defaultZone: http://localhost:7001/eureka
2.4、生產(chǎn)者啟動類
package com.zcl.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * 描述:消息生產(chǎn)者啟動類 * * @author zhong * @date 2022-09-22 12:19 */ @SpringBootApplication public class StreamMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class, args); } }
2.5、業(yè)務(wù)實(shí)現(xiàn)
2.5.1、服務(wù)接口實(shí)現(xiàn)類
自己創(chuàng)建一個(gè)實(shí)現(xiàn)的接口以及里面的方法
注意:在這個(gè)服務(wù)實(shí)現(xiàn)類里面不是使用
@Service
注解了,因?yàn)椴皇莣eb應(yīng)用,而是Stream消息驅(qū)動,是與中間件進(jìn)行打交道的不是與數(shù)據(jù)庫
package com.zcl.springcloud.service.Impl; import com.zcl.springcloud.service.IMessageProvider; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import javax.annotation.Resource; import java.util.UUID; /** * 描述:發(fā)送接口實(shí)現(xiàn)類 * 必須使用@EnableBinding(Source.class)注解開啟消息推送管道 * * @author zhong * @date 2022-09-22 12:24 */ @Slf4j @EnableBinding(Source.class) public class IMessageProviderImpl implements IMessageProvider { /** * 消息發(fā)送管道 */ @Resource private MessageChannel output; /** * 發(fā)送消息 * @return */ @Override public String send() { // 定義消息 String serial = UUID.randomUUID().toString(); // 構(gòu)建并發(fā)送消息 this.output.send(MessageBuilder.withPayload(serial).build()); log.info("-------------- " + serial + " ----------------"); return serial; } }
2.5.2、控制器實(shí)現(xiàn)
package com.zcl.springcloud.controller; import com.zcl.springcloud.service.IMessageProvider; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * 描述:消息發(fā)送控制器 * * @author zhong * @date 2022-09-22 12:37 */ @RestController public class SendMessageController { /** * 注入消息發(fā)送管道接口 */ @Resource private IMessageProvider messageProvider; /** * 每調(diào)用一次接口發(fā)送一次消息 * @return */ @GetMapping(value = "/sendMessage") public String sendMessage() { return messageProvider.send(); } }
2.6、啟動測試
- 啟動7001Eureka訪問中心
- 啟動8801消息發(fā)送者,啟動成功以及觀察RabbitMQ的管理界面
3.訪問接口發(fā)送消息,查看MQ的管理頁面波峰情況
3、消息驅(qū)動之消費(fèi)者(input)
同樣的參考如下流程圖
3.1、新建cloud-stream-rabbitmq-consumer8802模塊
3.2、引入pom.xml依賴
與8801一樣
3.3、添加YAML配置文件
配置文件與消息生產(chǎn)的區(qū)別在于:
output: # 這個(gè)名字是一個(gè)通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息; defaultRabbit: # 表示定義的名稱,用于于binding整合 type: rabbit # 消息組件類型 environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務(wù)的整合處理 input: # 這個(gè)名字是一個(gè)通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設(shè)置消息類型,本次為對象json,如果是文本則設(shè)置“text/plain” binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置 eureka: client: # 客戶端進(jìn)行Eureka注冊的配置 service-url: defaultZone: http://localhost:7001/eureka
3.4、添加啟動類StreamMQMain8802
與消息生產(chǎn)者一樣
3.5、業(yè)務(wù)實(shí)現(xiàn)
必須要有
@Component
注解注入到Spring容器中
package com.zcl.springcloud.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; /** * 描述:消息消費(fèi)者控制器 * * @author zhong * @date 2022-09-22 13:18 */ @Slf4j @Component @EnableBinding(Sink.class) public class ReceiveMessageListener { /** * 注入消費(fèi)者的端口號 */ @Value("${server.port}") private String port; /** * 監(jiān)聽消息 * @param message * @return */ @StreamListener(Sink.INPUT) public void input(Message<String> message){ log.info("消費(fèi)者1號接收到的消息 ----- " + message.getPayload() + " -----,port: " + port); } }
3.6、啟動項(xiàng)目測試
- 啟動7001
- 啟動8801,消息發(fā)送者
- 啟動8802,消息消費(fèi)者
- 8801發(fā)送消息,8802消費(fèi)消息,并查看具體的MQ波峰圖
控制器輸出
4、分組消費(fèi)與持久化
4.1、完整參考cloud-stream-rabbitmq-consumer8802,創(chuàng)建8803項(xiàng)目
除了啟動的端口號不一樣之外其他的配置都一樣
4.2、啟動項(xiàng)目發(fā)現(xiàn)問題
- 啟動7001(Eureka服務(wù)中心)
- 啟動8801(生產(chǎn))、8802(消費(fèi))、8803(消費(fèi))
- 測試發(fā)送消失是否兩個(gè)消費(fèi)者都可以接收到
4.2.1、重復(fù)消費(fèi)
目前是8802/8803同時(shí)都收到了,存在重復(fù)消費(fèi)問題
解決方案:分組和持久化屬性group
常見案例
比如在如下場景中,訂單系統(tǒng)我們做集群部署,都會從RabbitMQ中獲取訂單信息,那如果一個(gè)訂單同時(shí)被兩個(gè)服務(wù)獲取到,那么就會造成數(shù)據(jù)錯誤,我們得避免這種情況。這時(shí)我們就可以使用Stream中的消息分組來解決
注意在Stream中處于同一個(gè)group中的多個(gè)消費(fèi)者是競爭關(guān)系,就能夠保證消息只會被其中一個(gè)應(yīng)用消費(fèi)一次。
不同組是可以全面消費(fèi)的(重復(fù)消費(fèi)),同一組內(nèi)會發(fā)生競爭關(guān)系,只有其中一個(gè)可以消費(fèi)。
4.2.2、分組
自定義配置分組,自定義分為同一個(gè)組,解決重復(fù)消費(fèi)問題
配置文件分組
分別給8801、8802進(jìn)行分組【orderA】
重啟項(xiàng)目查看MQ管理
orderB是歷史記錄,上面的配置以及都分為了
ordeerA
組,進(jìn)入orderA組可以查看實(shí)際的消費(fèi)者數(shù)量同一組內(nèi)會發(fā)生競爭關(guān)系,只有其中一個(gè)可以消費(fèi),啟動項(xiàng)目測試是否為真
4.2.3、持久化
通過上述,解決了重復(fù)消費(fèi)問題,再看看持久化
停止8802/8803并去除掉8802的分組group: atguiguA,8803保留
8801先發(fā)送7條消息到rabbitmq
3.先啟動8802,無分組屬性配置,后臺沒有打出來消息
8802因?yàn)槿∠?code>groupA的分組所以獲取不到持久化的數(shù)據(jù)(如果重啟mq也會消失)
4.再啟動8803,有分組屬性配置,后臺打出來了MQ上的消息
8803保存
groupA
的分組所以在啟動的時(shí)候就會將持久化的數(shù)據(jù)消費(fèi)
到此這篇關(guān)于SpringCloud Stream消息驅(qū)動的文章就介紹到這了,更多相關(guān)SpringCloud Stream消息驅(qū)動內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- spring-integration連接MQTT全過程
- Spring?Integration概述與怎么使用詳解
- 如何使用Spring?integration在Springboot中集成Mqtt詳解
- 源碼解讀Spring-Integration執(zhí)行過程
- Springcloud Stream消息驅(qū)動工具使用介紹
- Spring?Cloud?Stream消息驅(qū)動組件使用方法介紹
- Springcloud整合stream,rabbitmq實(shí)現(xiàn)消息驅(qū)動功能
- SpringCloud Stream消息驅(qū)動實(shí)例詳解
- Spring Integration 實(shí)現(xiàn)消息驅(qū)動的詳細(xì)步驟
相關(guān)文章
java使用http實(shí)現(xiàn)文件下載學(xué)習(xí)示例
這篇文章主要介紹了java使用http實(shí)現(xiàn)文件下載學(xué)習(xí)示例,需要的朋友可以參考下2014-04-04@Autowired注解注入的xxxMapper報(bào)錯問題及解決
這篇文章主要介紹了@Autowired注解注入的xxxMapper報(bào)錯問題及解決,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11解決Spring Cloud Gateway獲取body內(nèi)容,不影響GET請求的操作
這篇文章主要介紹了解決Spring Cloud Gateway獲取body內(nèi)容,不影響GET請求的操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12SpringCloud hystrix服務(wù)降級概念介紹
什么是服務(wù)降級?當(dāng)服務(wù)器壓力劇增的情況下,根據(jù)實(shí)際業(yè)務(wù)情況及流量,對一些服務(wù)和頁面有策略的不處理或換種簡單的方式處理,從而釋放服務(wù)器資源以保證核心交易正常運(yùn)作或高效運(yùn)作2022-09-09Spring Security單項(xiàng)目權(quán)限設(shè)計(jì)過程解析
這篇文章主要介紹了Spring Security單項(xiàng)目權(quán)限設(shè)計(jì)過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11使用SpringBoot開發(fā)Restful服務(wù)實(shí)現(xiàn)增刪改查功能
Spring Boot是由Pivotal團(tuán)隊(duì)提供的全新框架,其設(shè)計(jì)目的是用來簡化新Spring應(yīng)用的初始搭建以及開發(fā)過程。這篇文章主要介紹了基于SpringBoot開發(fā)一個(gè)Restful服務(wù),實(shí)現(xiàn)增刪改查功能,需要的朋友可以參考下2018-01-01