SpringCloud中的Stream服務(wù)間消息傳遞詳解
一、Stream 的介紹
Stream 就是在消息隊(duì)列的基礎(chǔ)上,對(duì)其進(jìn)行封裝,可以是我們更方便的去使用。
Spring Cloud Stream應(yīng)用由第三方的中間件組成。應(yīng)用間的通信通過輸入通道(input channel)和輸出通道(output channel)完成。這些通道是有Spring Cloud Stream 注入的。而通道與外部的代理(可以理解為上文所說的數(shù)據(jù)中心)的連接又是通過Binder實(shí)現(xiàn)的。
二、Stream 的快速入門
2.1 編輯消費(fèi)者
2.1.1 導(dǎo)入相關(guān)依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
2.1.2 編寫配置文件
spring: rabbitmq: host: 192.168.31.138 port: 5672 username: test password: test virtual-host: /test
2.1.3 聲明 channel(通道)
通過 @Input() 注解來指定所要聲明的通道。
public interface StreamClient { @Input("myMessage") SubscribableChannel input(); }
被 @Input 和@Output 注解的方法。其中 @Input 注解的方法返回的是 SubscribableChannel ,@Output 注解的方法返回的是 MessageChannel 。
聲明通道(channel)的方法就是使用 @Input 和 @Output 注解方法。你想要多少通道就注解多少方法。
默認(rèn)情況下,通道的名稱就是注解的方法的名稱,如果需要自己指定,只需要給這兩個(gè)注解傳遞 String 類型的參數(shù)即可。
使用@Input或者@Output注解聲明了通道(channel)的接口。Spring Cloud Stream會(huì)自動(dòng)實(shí)現(xiàn)這些接口。
2.1.4 創(chuàng)建和綁定 channel(通道)
使用 @EnableBinding 就能創(chuàng)建和綁定通道(channel)。
@SpringBootApplication @EnableEurekaClient @EnableBinding(StreamClient.class) public class SearchApplication { public static void main(String[] args) { SpringApplication.run(SearchApplication.class,args); } }
@EnableBinding 注解接收的參數(shù)就是使用 @Input 或者 @Output 注解聲明了通道(channel)的接口。
2.1.5 消費(fèi)消息
@StreamListener 接收的參數(shù)是要處理的通道(channel)的名,所注解的方法就是處理從通道獲取到的數(shù)據(jù)的方法。方法的參數(shù)就是獲取到的數(shù)據(jù)。
@Component public class StreamReceiver { @StreamListener("myMessage") public void msg(Object msg){ System.out.println("接收到消息:"+msg); } }
2.2 編輯生產(chǎn)者
2.2.1 導(dǎo)入相關(guān)依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
2.2.2 編寫配置文件
spring: rabbitmq: host: 192.168.31.138 port: 5672 username: test password: test virtual-host: /test
2.2.3 聲明 channel(通道)
public interface StreamClient { @Output("myMessage") MessageChannel output(); }
2.2.4 創(chuàng)建和綁定
@SpringBootApplication ....... @EnableBinding(StreamClient.class) public class CustomerApplication { public static void main(String[] args) { SpringApplication.run(CustomerApplication.class,args); } ........ }
2.2.5 生產(chǎn)消息
@RestController public class MessageController { @Autowired private StreamClient streamClient; @GetMapping("/send") public String send(){ streamClient.output().send(MessageBuilder.withPayload("Hello stream!!!!!").build()); return "消息發(fā)送成功!"; } }
2.3 測(cè)試
三、Stream 重復(fù)消費(fèi)消息
避免一個(gè)消息被多個(gè)消費(fèi)者消費(fèi),只需要將多個(gè)消費(fèi)者指定為一個(gè)消費(fèi)者組即可。
**消費(fèi)組:**直觀的理解就是一群消費(fèi)者一起處理消息(每個(gè)發(fā)送到消費(fèi)組的數(shù)據(jù),僅由消費(fèi)組中的一個(gè)消費(fèi)者處理)。
spring: cloud: stream: bindings: myMessage: #指定channel group: customer #指定消費(fèi)者組
四、Stream 消費(fèi)者的手動(dòng) ACK
4.1 編寫配置文件
spring: cloud: stream: rabbit: bindings: myMessage: #指定 channel name consumer: acknowledgeMode: MANUAL # 指定規(guī)則默認(rèn) AUTO
4.2 修改消費(fèi)消息的方法
消息是帶有 Header 的,類似 Http 的 headler,我們可以通過 @Header 來獲取指定的 Header。
@Component public class StreamReceiver { @StreamListener("myMessage") public void msg(Object msg, @Header(name = AmqpHeaders.CHANNEL) Channel channel, @Header(name = AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException { System.out.println("接收到消息:"+msg); channel.basicAck(deliveryTag,false); } }
到此這篇關(guān)于SpringCloud中的Stream服務(wù)間消息傳遞詳解的文章就介紹到這了,更多相關(guān)SpringCloud的Stream內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringCloudStream原理和深入使用小結(jié)
- 使用Spring?Cloud?Stream處理事件的示例詳解
- spring-cloud-stream的手動(dòng)消息確認(rèn)問題
- SpringCloudStream中的消息分區(qū)數(shù)詳解
- 關(guān)于SpringCloudStream配置問題
- Spring Cloud Stream 高級(jí)特性使用詳解
- SpringCloud微服務(wù)開發(fā)基于RocketMQ實(shí)現(xiàn)分布式事務(wù)管理詳解
- SpringCloud+RocketMQ實(shí)現(xiàn)分布式事務(wù)的實(shí)踐
- Spring Cloud Stream整合RocketMQ的搭建方法
相關(guān)文章
SpringBoot實(shí)現(xiàn)過濾器攔截器的耗時(shí)對(duì)比
這篇文章主要為大家詳細(xì)介紹了SpringBoot實(shí)現(xiàn)過濾器攔截器的輸出接口耗時(shí)對(duì)比,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解一下2022-06-06java實(shí)現(xiàn)航空用戶管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)航空用戶管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-07-07Java的ConcurrentLinkedQueue源碼分析
這篇文章主要介紹了Java的ConcurrentLinkedQueue源碼分析,ConcurrentLinkedQueue 是一個(gè)基于鏈接節(jié)點(diǎn)的無界線程安全的隊(duì)列,當(dāng)我們添加一個(gè)元素的時(shí)候,它會(huì)添加到隊(duì)列的尾部,當(dāng)我們獲取一個(gè)元素時(shí),它會(huì)返回隊(duì)列頭部的元素,需要的朋友可以參考下2023-12-12SpringMVC中使用Thymeleaf模板引擎實(shí)例代碼
這篇文章主要介紹了SpringMVC中使用Thymeleaf模板引擎實(shí)例代碼,分享了相關(guān)代碼示例,小編覺得還是挺不錯(cuò)的,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-02-02