SpringCloud Stream消息驅(qū)動實例詳解
1. 消息驅(qū)動概述
1.1 是什么
在實際應(yīng)用中有很多消息中間件,比如現(xiàn)在企業(yè)里常用的有ActiveMQ、RabbitMQ、RocketMQ、Kafka等,學習所有這些消息中間件無疑需要大量時間經(jīng)歷成本,那有沒有一種技術(shù),使我們不再需要關(guān)注具體的消息中間件的細節(jié),而只需要用一種適配綁定的方式,自動的在各種消息中間件內(nèi)切換呢?消息驅(qū)動就是這樣的技術(shù),它能 屏蔽底層消息中間件的差異,降低切換成本,統(tǒng)一消息的編程模型。
SpringCloud Stream是一個構(gòu)件消息驅(qū)動微服務(wù)的框架。應(yīng)用程序通過inputs和outputs來與SpringCloud Stream中的綁定器(binder)對象交互,通過配置來綁定,而SpringCloud Stream的綁定器對象負責與消息中間件交互,所以,我們只需要搞清楚如何與SpringCloud Stream交互就可以方便使用消息驅(qū)動的方式。但是 截至到目前 SpringCloud Stream僅支持RabbitMQ和Kafka。
1.2 設(shè)計思想
標準MQ模型
- 生產(chǎn)者 / 消費者之間靠消息媒介傳遞信息內(nèi)容 -
Messag
- 消息必須走特定的通道 -
Message Channel
- 消息通道里的消息如何被消費呢?誰負責處理? - 消息通道
MessageChannel
的子接口SubscribableChannel
,由 MessageHandler 消息處理器所訂閱
為什么使用Cloud Stream
比如說我們用到了RabbitMQ和Kafka,由于這兩個消息中間件的架構(gòu)上的不同,像RabbitMQ有exchange,Kafka有Topic和Partitions分區(qū),這些中間件的差異性導(dǎo)致實際項目開發(fā)給我們造成了一定的困擾,我們?nèi)绻昧藘蓚€消息隊列的其中一種,后面的業(yè)務(wù)需求如果又要往另外一種消息隊列進行遷移,這無疑是一個災(zāi)難,一大堆東西都要重新推到重做,因為它跟我們的系統(tǒng)耦合了,這時候SpringCloud Stream給我們提供了一種解耦合的方式。
stream憑什么可以統(tǒng)一底層差異
在沒有綁定器這個概念的情況下,我們的SpringBoot應(yīng)用要直接與消息中間件進行信息交互的時候,由于各消息中間件構(gòu)建的初衷不同,它們的實現(xiàn)細節(jié)上會有較大的差異性。
通過定義綁定器作為中間層,完美的實現(xiàn)了 應(yīng)用程序與消息中間件細節(jié)之間的隔離。Stream對消息中間件的進一步封裝(通過向應(yīng)用程序暴露統(tǒng)一的Channel通道,使得應(yīng)用程序不需要再考慮各種不同的消息中間件實現(xiàn)),可以做到代碼層面對中間件的無感知,甚至于動態(tài)的切換中間件(如RabbitMQ切換為Kafka),使得微服務(wù)開發(fā)的高度解耦,服務(wù)可以更多的關(guān)注自己的業(yè)務(wù)流程。
在消息綁定器中,INPUT對應(yīng)于消費者,OUTPUT對應(yīng)于生產(chǎn)者。
Stream中的消息通信方式遵循了 發(fā)布-訂閱模式,用Topic(主題)進行廣播(RabbitMQ中對應(yīng)于Exchange交換機,Kafka中就是Topic)。
1.3 SpringCloud Stream標準流程套路
Binder
很方便的連接中間件,屏蔽差異Channel
通道,是隊列Queue的一種抽象,在消息通訊系統(tǒng)中就是實現(xiàn)了存儲和轉(zhuǎn)發(fā)的媒介,通過Channel對隊列進行配置Source
和Sink
簡單的可以理解為參照對象是SpringCloud Stream自身,從Stream發(fā)布消息就是輸出,接受消息就是輸入
1.4 SpringCloud Stream編碼API與常用注解
組成 | 說明 |
---|---|
Middleware | 中間件,目前只支持RabbitMQ和Kafka |
Binder | Binder是應(yīng)用與消息中間件之間的封裝,目前實行了RabbitMQ和Kafka的Binder,通過Binder可以很方便的連接中間件,可以動態(tài)的改變消息類型(對應(yīng)于Kafka的topic,RabbitMQ的exchange),這些都可以通過配置文件來實現(xiàn) |
@Input | 注解標識輸入通道,通過該輸入通道接收到的消息進入應(yīng)用程序 |
@Output | 注解標識輸出通道,發(fā)布的消息將通過該通道離開應(yīng)用程序 |
@StreamListner | 監(jiān)聽隊列,用于消費者的隊列的消息接收 |
@EnableBinding | 使信道Channel和交換機/主題(Exchange/Topic)綁定在一起 |
2. Spring Cloud Stream 案例
新建三個子模塊分別對應(yīng)于消息的生產(chǎn)者和消費者:
模塊名 | 微服務(wù)功能 |
---|---|
cloud-stream-rabbitmq-provider8801 | 生產(chǎn)者,發(fā)送消息模塊 |
cloud-stream-rabbitmq-consumer8802 | 消費者,接收消息模塊 |
cloud-stream-rabbitmq-consumer8803 | 消費者,接收消息模塊 |
2.1 消息驅(qū)動之消息生產(chǎn)者
新建Module:cloud-stream-rabbitmq-provider8801作為消息的生產(chǎn)者用來發(fā)送消息,在其POM文件中除引入web、actuator、eureka-client等必要啟動器外,還需要引入SpringCloud Stream對應(yīng)實現(xiàn)RabbitMQ的啟動器依賴:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
編寫其配置文件application.yml:
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息 defaultRabbit: # 表示定義的名稱,用于于binding整合 type: rabbit # 消息組件類型 environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置 spring: rabbitmq: host: mpolaris.top port: 5672 username: admin password: 1234321 bindings: # 服務(wù)的整合處理 output: # 這個名字是一個通道的名稱,OUTPUT表示這是消息的發(fā)送方 # 表示要使用的Exchange名稱定義 destination: testExchange # 設(shè)置消息類型,本次為json,文本則設(shè)置“text/plain” content-type: application/json # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置 default-binder: defaultRabbit eureka: client: # 客戶端進行Eureka注冊的配置 service-url: defaultZone: http://eureka7001.com:7001/eureka instance: # 設(shè)置心跳的時間間隔(默認是30秒) lease-renewal-interval-in-seconds: 2 # 如果現(xiàn)在超過了5秒的間隔(默認是90秒) lease-expiration-duration-in-seconds: 5 # 在信息列表時顯示主機名稱yml instance-id: send-8801.com # 訪問的路徑變?yōu)镮P地址 prefer-ip-address: true
編寫其主啟動類
編寫業(yè)務(wù)類,在業(yè)務(wù)類中分別要編寫 發(fā)送消息接口 及其 實現(xiàn)類,并在發(fā)送接口消息的實現(xiàn)類中 添加 @EnableBinding
注解 用來綁定消息的推送管道,消息生產(chǎn)者綁定的消息推送管道為 org.springframework.cloud.stream.messaging.Source
:
public interface IMessageProvider { public String send(); }
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import javax.annotation.Resource; import java.util.UUID; /** * @Author polaris * @Date 2021/3/4 21:46 */ @EnableBinding(Source.class) //定義消息的推送管道 public class MessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output; //消息發(fā)送管道 @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); //發(fā)送消息 System.out.println("==> serial:" + serial); return null; } }
注意我們在service的實現(xiàn)類中不再需要@Service
注解,因為這個service不再是傳統(tǒng)意義上的和Controller、DAO數(shù)據(jù)等進行交互的service,而是要綁定綁定器打交道的service。
然后編寫其業(yè)務(wù)層的Controller:
@RestController public class SendMessageController { @Autowired private IMessageProvider messageProvider; @GetMapping("/sendMessage") public String sendMessage() { return messageProvider.send(); } }
啟動服務(wù)注冊中心后和RabbitMQ后,啟動消息生產(chǎn)者微服務(wù),我們在RabbitMQ的控制面板中可以看見多出了一個名為testExchange的交換機,這個交換機恰恰就是我們之前在配置文件中配置的交換機名字testExchange。
然后我們訪問 http://localhost:8801/sendMessage 使用消息生產(chǎn)者微服務(wù)發(fā)送消息,在其微服務(wù)后臺我們看到了打印的消息。
在RabbitMQ的控制面板中我們也看到了確實發(fā)送了消息。
2.2 消息驅(qū)動之消息消費者
新建Module:cloud-stream-rabbitmq-consumer8802/8803作為消息的生產(chǎn)者用來接收消息,其POM文件中引入的啟動器依賴和消息生產(chǎn)者微服務(wù)的依賴幾乎相同,然后編寫其配置文件application.yml,其配置文件的書寫和消息生產(chǎn)者的幾乎一致,特別需要注意的是,消息生產(chǎn)者微服務(wù)用到的通道為OUTPUT,而消息消費者微服務(wù)用到的通道為INPUT,其他的配置文件信息就只需要注意端口號、注冊服務(wù)名的區(qū)別即可:
spring: cloud: bindings: input: # 這個名字是一個通道的名稱,INPUT表示消息消費者
編寫主啟動類
編寫消息消費者的業(yè)務(wù)類,由于是消費者,所以只需要編寫其Controller即可,在其Controller上同樣需要添加 @EnableBinding
注解用來綁定消息的推送管道,消息消費者綁定的消息推送管道為import org.springframework.cloud.stream.messaging.Sink
,在接收消息的方法中需要使用 @StreamListner
注解來監(jiān)聽其綁定的消息推送管道:
@Component @EnableBinding(Sink.class) public class ReceiveMessageController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("消費者" + serverPort + "號,收到消息:" + message.getPayload()); } }
然后啟動消息發(fā)送消費者服務(wù),用生產(chǎn)者發(fā)送消息,我們可以發(fā)現(xiàn)在消費者端可以成功接收到消息。
3. 分組消費和持久化
3.1 重復(fù)消費問題
當生產(chǎn)者發(fā)送消息后,此時的我們的消費者都接受了消息并進行了消費,也就是說同一條消息被多個消息消費者所消費。
上述的問題就是消息的 重復(fù)消費 問題,那么這個問題為什么如此重要呢?其實重復(fù)消費這個問題本身不可怕,可怕的是沒考慮到重復(fù)消費之后,怎么保證冪等性。(冪等性 通俗的說,就一個數(shù)據(jù),或者一個請求,重復(fù)很多次,需要確保對應(yīng)的數(shù)據(jù)是不會改變的,不能出錯)。分布式微服務(wù)應(yīng)用為了實現(xiàn)高可用和負載均衡,實際上同一功能的服務(wù)都會部署多個具體的服務(wù)實例。舉個例子,假設(shè)有一個系統(tǒng),有一條消息要求往數(shù)據(jù)庫里插入一條數(shù)據(jù),要是這個消息重復(fù)消費兩次,結(jié)果就是向數(shù)據(jù)庫里插入了兩條數(shù)據(jù),這樣數(shù)據(jù)就錯了,就違背了冪等性原則,但是要是該消息消費到第二次的時候,可以判斷一下已經(jīng)消費過了,然后直接將該消息丟棄,這就實現(xiàn)了只插入一條數(shù)據(jù),一條消息重復(fù)出現(xiàn)了兩次,但是只有第一次真正被消費了,數(shù)據(jù)庫里也就只插入了一條數(shù)據(jù),這就保證了系統(tǒng)的冪等性。
上面簡單的介紹了消息的重復(fù)消費問題,那如何解決這種重復(fù)消費問題呢,那就需要我們進行 分組和持久化屬性組 操作,利用SpringCloud Stream中的消息分組來解決這個問題,需要注意的是在Stream中處于同一組中的多個消息消費者是競爭關(guān)系,也就是保證生產(chǎn)者所發(fā)送的同一個消息只會被其中一個消費者消費一次。 不同組的消費者是可以對消息進行全面消費(重復(fù)消費)的,只有同一組內(nèi)才會發(fā)生競爭關(guān)系。
在RabbitMQ中,默認分組group是不同的,組流水號不一樣,被認為不同組,我們查看testExchange交換機,可以發(fā)現(xiàn)8802和8803兩個消息消費者處于不同的組,所以8801消息生產(chǎn)者發(fā)送的消息可以被這兩個消費者重復(fù)消費:
3.2 分組解決重復(fù)消費問題
上面在RabbitMQ控制面板中我們看到的組流水號是系統(tǒng)隨機分配的,這樣無疑不好控制,所以我們應(yīng)該自定義配置分組,將8802/8803兩個消息消費者微服務(wù)分為同一個組,以此來解決消息的重復(fù)消費問題。
先來演示如何自定義分組
在8802/8803微服務(wù)中的配置文件中分別添加組名屬性:
spring: cloud: stream: bindings: input: group: A/B # 分組名稱
這里我們將8802設(shè)置為A組,8803設(shè)置為B組,然后我們將消息消費方的兩個微服務(wù)重啟,我們再次查看其組流水號,發(fā)現(xiàn)不再是長長的隨機組流水號,而變成了我們自定義的分組:
此時由于8802/8803位于兩個不同分組下,所以沒有競爭關(guān)系,消息生產(chǎn)者發(fā)送消息后,仍然可以重復(fù)消費。
下面我們將這兩個消息消費方微服務(wù)分到相同的消費組中,這樣每次就只有一個消費者,消息生產(chǎn)者發(fā)送的消息只能被8802或8803其中一個接受到,這樣就避免了重復(fù)消費,將8802和8803的分組名都改為A,再次重啟兩個消息消費方微服務(wù),此時我們可以看到在分組A下已經(jīng)有了兩個消費者。
再用生產(chǎn)者發(fā)送5條消息,我們發(fā)現(xiàn)8802/8803分別消費了3條和2條不同的消息,而沒有出現(xiàn)重復(fù)消費的問題。
3.3 持久化
通過上述,解決了重復(fù)消費問題,再來看看持久化
加上了group就自動支持持久化了
下面來演示一下持久化
- 停止8802/8803并去除掉8802分組group:A(8803的分組group A沒有去掉)
- 8801發(fā)送4條消息到rabbitmq
- 先啟動8802(無分組屬性配置),后臺沒有打出來消息(消息丟失故障)
- 再啟動8803(有分組屬性配置),后臺打出了4條消息(消費持久化消息)
到此這篇關(guān)于SpringCloud Stream消息驅(qū)動的文章就介紹到這了,更多相關(guān)SpringCloud Stream消息驅(qū)動內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Flutter ListView 上拉加載更多下拉刷新功能實現(xiàn)方法
這篇文章主要介紹了Flutter ListView 上拉加載更多下拉刷新功能實現(xiàn)方法,本文給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2019-07-07Json字符串轉(zhuǎn)Java對象和List代碼實例
這篇文章主要介紹了Json字符串轉(zhuǎn)Java對象和List代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-06-06