spring-cloud-stream結(jié)合kafka使用詳解
1.pom文件導(dǎo)入依賴
<!-- kafka --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
2.application.yml文件配置
spring: cloud: stream: kafka: binder: brokers: xxx.xxx.xxx.xx:xxxx // Kafka的消息中間件服務(wù)器地址 bindings: xxx_output: // 通道名稱 destination: xxx // 消息發(fā)往的目的地,對應(yīng)topic 在發(fā)送消息的配置里面,group是不用配置的 // 如果我們需要傳輸json的信息,那么在發(fā)送消息端需要設(shè)置content-type為json(其實(shí)可以不寫,默認(rèn)content-type就是json) xxx_input: destination: xxx // 消息發(fā)往的目的地,對應(yīng)topic group: xxx // 對應(yīng)kafka的group
3.創(chuàng)建消息發(fā)送者
@EnableBinding(Source.class) // @EnableBinding 是綁定通道的,Soure.class是spring 提供的,表示這是一個(gè)可綁定的發(fā)布通道 @Service public class MqService { @Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT) private MessageChannel oesWorkbenchChannel; /** * 發(fā)送一條kafka消息 */ public boolean sendLifeData(Object object) { return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT); } } // 發(fā)布通道 public interface Source { @Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT) MessageChannel oesWorkbenchLifeDataOutput(); // 發(fā)布通道用MessageChannel }
4.創(chuàng)建消息監(jiān)聽者
@Slf4j @EnableBinding(Sink.class) public class WorkbenchStreamListener { @Resource private FileService fileService; @StreamListener(KafkaConstants.xxx_input) // 監(jiān)聽接受通道 public void receiveData(MoveMessage moveMessage) { } } // 接受通道 public interface Sink { @Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT) SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel }
接下來就可以愉快的發(fā)送監(jiān)聽消息了
到此這篇關(guān)于spring-cloud-stream結(jié)合kafka使用詳解的文章就介紹到這了,更多相關(guān)spring-cloud-stream整合kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java 代理模式及動(dòng)態(tài)代理機(jī)制深入分析
這篇文章主要介紹了java 代理模式及動(dòng)態(tài)代理機(jī)制深入分析的相關(guān)資料, 代理是一種常用的設(shè)計(jì)模式,其目的就是為其他對象提供一個(gè)代理以控制對某個(gè)對象的訪問,需要的朋友可以參考下2017-03-03SpringBoot集成Spring Data JPA及讀寫分離
這篇文章主要介紹了SpringBoot集成Spring Data JPA及讀寫分離的相關(guān)知識,需要的朋友可以參考下2017-04-04淺談mybatis中的#和$的區(qū)別 以及防止sql注入的方法
下面小編就為大家?guī)硪黄獪\談mybatis中的#和$的區(qū)別 以及防止sql注入的方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-10-10java中BeanUtils.copyProperties的用法(超詳細(xì))
本文介紹了BeanUtils.copyProperties()方法的使用,包括其功能、用法、注意事項(xiàng)和示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-08-08基于SpringBoot和Vue實(shí)現(xiàn)分片上傳系統(tǒng)
最近想做一個(gè)關(guān)于文件上傳的個(gè)人小網(wǎng)盤,一開始嘗試使用了OSS的方案,但是該方案對于大文件來說并不友好,所以開始嘗試分片上傳方案的探索,接下來小編給大家詳細(xì)的介紹一下如何基于SpringBoot和Vue實(shí)現(xiàn)分片上傳系統(tǒng),需要的朋友可以參考下2023-12-12