SpringBoot3集成Kafka的方法詳解
一、簡(jiǎn)介
Kafka是一個(gè)開(kāi)源的分布式事件流平臺(tái),常被用于高性能數(shù)據(jù)管道、流分析、數(shù)據(jù)集成和關(guān)鍵任務(wù)應(yīng)用,基于Zookeeper協(xié)調(diào)的處理平臺(tái),也是一種消息系統(tǒng),具有更好的吞吐量、內(nèi)置分區(qū)、復(fù)制和容錯(cuò),這使得它成為大規(guī)模消息處理應(yīng)用程序的一個(gè)很好的解決方案;
二、環(huán)境搭建
1、Kafka部署
1、下載安裝包:kafka_2.13-3.5.0.tgz
2、配置環(huán)境變量
open -e ~/.bash_profile
export KAFKA_HOME=/本地路徑/kafka3.5
export PATH=$PATH:$KAFKA_HOME/bin
source ~/.bash_profile
3、該目錄【kafka3.5/bin】啟動(dòng)zookeeper
zookeeper-server-start.sh ../config/zookeeper.properties
4、該目錄【kafka3.5/bin】啟動(dòng)kafka
kafka-server-start.sh ../config/server.properties
2、Kafka測(cè)試
1、生產(chǎn)者
kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>id-1-message
>id-2-message
2、消費(fèi)者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
id-1-message
id-2-message
3、查看topic列表
kafka-topics.sh --bootstrap-server localhost:9092 --list
test-topic
4、查看消息列表
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --partition 0
id-1-message
id-2-message
3、可視化工具
配置和部署
1、下載安裝包:kafka-eagle-bin-3.0.2.tar.gz
2、配置環(huán)境變量
open -e ~/.bash_profile
export KE_HOME=/本地路徑/efak-web-3.0.2
export PATH=$PATH:$KE_HOME/bin
source ~/.bash_profile
3、修改配置文件:system-config.properties
efak.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
efak.url=jdbc:mysql://127.0.0.1:3306/kafka-eagle
4、本地新建數(shù)據(jù)庫(kù):kafka-eagle,注意用戶名和密碼是否一致
5、啟動(dòng)命令
efak-web-3.0.2/bin/ke.sh start
命令語(yǔ)法: ./ke.sh {start|stop|restart|status|stats|find|gc|jdk|version|sdate|cluster}
6、本地訪問(wèn)【localhost:8048】 username:admin password:123456
KSQL語(yǔ)句測(cè)試
select * from `test-topic` where `partition` in (0) order by `date` desc limit 5
select * from `test-topic` where `partition` in (0) and msg like '%5%' order by `date` desc limit 3
三、工程搭建
1、工程結(jié)構(gòu)
2、依賴管理
這里關(guān)于依賴的管理就比較復(fù)雜了,首先spring-kafka
組件選擇與boot框架中spring相同的依賴,即6.0.10
版本,在spring-kafka
最近的版本中3.0.8
符合;
但是該版本使用的是kafka-clients
組件的3.3.2
版本,在Spring文檔的kafka模塊中,明確說(shuō)明spring-boot:3.1
要使用kafka-clients:3.4
,所以從spring-kafka
組件中排除掉,重新依賴kafka-clients
組件;
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka-clients.version}</version> </dependency>
3、配置文件
配置kafka連接地址,監(jiān)聽(tīng)器的消息應(yīng)答機(jī)制,消費(fèi)者的基礎(chǔ)模式;
spring: # kafka配置 kafka: bootstrap-servers: localhost:9092 listener: missing-topics-fatal: false ack-mode: manual_immediate consumer: group-id: boot-kafka-group enable-auto-commit: false max-poll-records: 10 properties: max.poll.interval.ms: 3600000
四、基礎(chǔ)用法
1、消息生產(chǎn)
模板類KafkaTemplate
用于執(zhí)行高級(jí)的操作,封裝各種消息發(fā)送的方法,在該方法中,通過(guò)topic
和key
以及消息主體,實(shí)現(xiàn)消息的生產(chǎn);
@RestController public class ProducerWeb { @Resource private KafkaTemplate<String, String> kafkaTemplate; @GetMapping("/send/msg") public String sendMsg (){ try { // 構(gòu)建消息主體 JsonMapper jsonMapper = new JsonMapper(); String msgBody = jsonMapper.writeValueAsString(new MqMsg(7,"boot-kafka-msg")); // 發(fā)送消息 kafkaTemplate.send("boot-kafka-topic","boot-kafka-key",msgBody); } catch (JsonProcessingException e) { e.printStackTrace(); } return "OK" ; } }
2、消息消費(fèi)
編寫消息監(jiān)聽(tīng)類,通過(guò)KafkaListener
注解控制監(jiān)聽(tīng)的具體信息,在實(shí)現(xiàn)消息生產(chǎn)和消費(fèi)的方法測(cè)試后,使用可視化工具kafka-eagle
查看topic和消息列表;
@Component public class ConsumerListener { private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class); @KafkaListener(topics = "boot-kafka-topic") public void listenUser (ConsumerRecord<?,String> record, Acknowledgment acknowledgment) { try { String key = String.valueOf(record.key()); String body = record.value(); log.info("\n=====\ntopic:boot-kafka-topic,key{},body:{}\n=====\n",key,body); } catch (Exception e){ e.printStackTrace(); } finally { acknowledgment.acknowledge(); } } }
五、參考源碼
文檔倉(cāng)庫(kù):
https://gitee.com/cicadasmile/butte-java-note
到此這篇關(guān)于SpringBoot3集成Kafka的方法詳解的文章就介紹到這了,更多相關(guān)SpringBoot3集成Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Spring的autowire-candidate設(shè)計(jì)
現(xiàn)在的Spring應(yīng)用通常都是基于注解開(kāi)發(fā),但是對(duì)Spring感興趣的同學(xué)可以借助Spring早期基于Xml配置的各種運(yùn)用來(lái)加深對(duì)Spring框架內(nèi)部的理解和體會(huì)Spring框架的設(shè)計(jì)之妙。這篇文章我們就來(lái)談?wù)刋ml配置之default-autowire-candidates2021-06-06Java8中l(wèi)ambda表達(dá)式的應(yīng)用及一些泛型相關(guān)知識(shí)
這篇文章主要介紹了Java8中l(wèi)ambda表達(dá)式的應(yīng)用及一些泛型相關(guān)知識(shí)的相關(guān)資料2017-01-01如何通過(guò)一張圖搞懂springBoot自動(dòng)注入原理
這篇文章主要給大家介紹了關(guān)于如何通過(guò)一張圖搞懂springBoot自動(dòng)注入原理的相關(guān)資料,文中通過(guò)圖文以及實(shí)例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2022-02-02JavaWeb之Servlet注冊(cè)頁(yè)面的實(shí)現(xiàn)示例
注冊(cè)頁(yè)面是很多網(wǎng)站都會(huì)是使用的到,本文主要介紹了JavaWeb之Servlet注冊(cè)頁(yè)面的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-04-04