Spring Boot集成Apache Kafka的實(shí)戰(zhàn)指南
Apache Kafka 是一個分布式流處理平臺,廣泛用于構(gòu)建實(shí)時數(shù)據(jù)管道、日志聚合系統(tǒng)和事件溯源架構(gòu)。Spring Boot 提供了對 Kafka 的良好集成支持,使得開發(fā)者可以非常便捷地在項(xiàng)目中使用 Kafka。
本文將手把手教你如何在 Spring Boot 項(xiàng)目中集成 Kafka,包括生產(chǎn)者(Producer)和消費(fèi)者(Consumer)的實(shí)現(xiàn),并提供完整的代碼示例。
開發(fā)環(huán)境準(zhǔn)備
Java 17+
Maven 或 Gradle
Spring Boot 3.x
Apache Kafka 3.0+(本地或遠(yuǎn)程)
IDE(如 IntelliJ IDEA、VS Code)
創(chuàng)建 Spring Boot 項(xiàng)目
你可以通過 Spring Initializr 創(chuàng)建一個新的 Spring Boot 項(xiàng)目,選擇以下依賴:
- Spring Web
- Spring for Apache Kafka
或者手動添加 pom.xml 中的依賴:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Spring Boot 會自動管理版本兼容性,無需手動指定版本號。
配置 Kafka 連接信息
在 application.yml 或 application.properties 文件中配置 Kafka 相關(guān)參數(shù):
application.yml 示例:
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
編寫 Kafka 生產(chǎn)者(Producer)
創(chuàng)建一個服務(wù)類用于發(fā)送消息到 Kafka 主題。
KafkaProducer.java
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducer { private final KafkaTemplate<String, String> kafkaTemplate; public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); System.out.println("Sent message: " + message); } }
編寫 Kafka 消費(fèi)者(Consumer)
使用 @KafkaListener 注解監(jiān)聽特定主題的消息。
KafkaConsumer.java
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumer { @KafkaListener(topics = "test-topic", groupId = "my-group") public void listen(ConsumerRecord<String, String> record) { System.out.printf("Received message: topic - %s, partition - %d, offset - %d, key - %s, value - %s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } }
添加 REST 接口用于測試發(fā)送消息
為了方便測試,我們可以創(chuàng)建一個簡單的 REST 控制器來觸發(fā)消息發(fā)送。
KafkaController.java
import org.springframework.web.bind.annotation.*; import org.springframework.beans.factory.annotation.Autowired; @RestController @RequestMapping("/kafka") public class KafkaController { @Autowired private KafkaProducer kafkaProducer; @PostMapping("/send") public String sendMessage(@RequestParam String msg) { kafkaProducer.sendMessage("test-topic", msg); return "Message sent: " + msg; } }
啟動 Kafka 環(huán)境(可選)
如果你還沒有運(yùn)行 Kafka,可以按照以下步驟快速啟動:
啟動 Zookeeper(Kafka 依賴)
bin/zookeeper-server-start.sh config/zookeeper.properties
啟動 Kafka 服務(wù)
bin/kafka-server-start.sh config/server.properties
創(chuàng)建測試 Topic
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
測試接口
啟動 Spring Boot 應(yīng)用后,訪問如下接口發(fā)送消息:
POST http://localhost:8080/kafka/send?msg=HelloKafka
觀察控制臺輸出,確認(rèn)是否收到類似以下內(nèi)容:
Received message: topic - test-topic, partition - 0, offset - 5, key - null, value - HelloKafka
擴(kuò)展功能建議
使用 JSON 格式傳輸對象(自定義序列化/反序列化)
多消費(fèi)者組配置與負(fù)載均衡
異常處理與重試機(jī)制(@DltHandler, SeekToCurrentErrorHandler)
Kafka Streams 實(shí)現(xiàn)實(shí)時流處理邏輯
配置 SSL、SASL 安全認(rèn)證
結(jié)合 Spring Cloud Stream 構(gòu)建云原生事件驅(qū)動架構(gòu)
到此這篇關(guān)于Spring Boot集成Apache Kafka的實(shí)戰(zhàn)指南的文章就介紹到這了,更多相關(guān)SpringBoot集成Apache Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
一次由Lombok的@AllArgsConstructor注解引發(fā)的錯誤及解決
這篇文章主要介紹了一次由Lombok的@AllArgsConstructor注解引發(fā)的錯誤及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09將Mybatis升級為Mybatis-Plus的詳細(xì)過程
本文詳細(xì)介紹了在若依管理系統(tǒng)(v3.8.8)中將MyBatis升級為MyBatis-Plus的過程,旨在提升開發(fā)效率,通過本文,開發(fā)者可實(shí)現(xiàn)系統(tǒng)功能無損升級,同時享受MyBatis-Plus帶來的便捷特性,如代碼簡化和性能優(yōu)化,需要的朋友可以參考下2025-04-04