SpringBoot3+Kafka實戰(zhàn)指南
1. 項目層級
像火鍋店的分工:點單員、傳菜員、食客清清楚楚。
kafka/
├── pom.xml # 根 POM(BOM對齊)
├── provider/ # 點單:生產(chǎn)者
│ ├── pom.xml # 子模塊 POM
│ └── src/main/java/org/example/provider/
│ ├── ProviderApplication.java
│ ├── conf/KafkaTopicsConfig.java
│ ├── controller/ProviderController.java
│ └── service/KafkaProducerService.java
│ └── src/main/resources/application.yaml
└── consumer/ # 上桌:消費者
├── pom.xml # 子模塊 POM
└── src/main/java/org/example/consumer/
├── ConsumerApplication.java
└── listener/KafkaConsumerListener.java
└── src/main/resources/application.yaml
2. 根 POM(大廚的調(diào)料表)
<modules>
<module>provider</module>
<module>consumer</module>
</modules>
<properties>
<java.version>17</java.version>
<spring.boot.version>3.4.3</spring.boot.version>
<spring.cloud.version>2024.0.2</spring.cloud.version>
</properties>
<!-- 關(guān)鍵:用 BOM 管理依賴版本(不用 parent 也行) -->
<dependencyManagement>
<dependencies>
<!-- Spring Boot 依賴版本對齊(含 starter、lombok 等) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- Spring Cloud 依賴版本對齊 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- hutool工具類 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-ai</artifactId>
<version>5.8.38</version>
</dependency>
</dependencies>
?? 全局版本對齊,避免“鍋底和食材不搭”。
3. 子模塊 POM
3.1 provider/pom.xml
<parent>
<groupId>org.example</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>provider</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>provider</name>
<description>provider</description>
<packaging>jar</packaging>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
3.2 consumer/pom.xml
<parent>
<groupId>org.example</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>consumer</name>
<description>consumer</description>
<packaging>jar</packaging>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.58</version>
</dependency>
</dependencies>
4. 配置(菜單寫清楚)
4.1 Provider(application.yaml - 生產(chǎn)者)
server:
port: 1003 # 本模塊 HTTP 端口
app:
kafka:
topic: demo.topic.v1 # 要發(fā)送/創(chuàng)建的主題名
auto-create-topic: true # 開啟后,會注冊 NewTopic bean 從而在啟動時創(chuàng)建主題(見 KafkaTopicsConfig)
spring:
kafka:
bootstrap-servers: yiqiquhuxi.cn:9092
# 數(shù)據(jù)網(wǎng)絡(luò)IO 序列化方式
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 發(fā)送字符串
# 可靠性
acks: all
retries: 3
4.2 Consumer(application.yaml - 消費者)
server:
port: 1004 # 本模塊端口(通常只看日志)
app:
kafka:
topic: demo.topic.v1 # 要訂閱的主題名(與 provider 保持一致)
spring:
kafka:
bootstrap-servers: yiqiquhuxi.cn:9092
consumer:
group-id: demo-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 接收字符串
# 說明:
# - JsonDeserializer 的默認類型鍵為 spring.json.value.default.type(源碼常量 VALUE_DEFAULT_TYPE)。:contentReference[oaicite:7]{index=7}
# - @KafkaListener 支持使用 ${...} 占位符讀取上述配置。:contentReference[oaicite:8]{index=8}
5. 核心代碼(廚師上陣)
5.1 入口
@SpringBootApplication
public class ProviderApplication {
public static void main(String[] args) { SpringApplication.run(ProviderApplication.class, args); }
}
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); }
}
5.2 消息模型
public record MessagePayload(String id, String content, long ts) {}
5.3 Provider(點菜 + 上菜)
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafka;
@Value("${app.kafka.topic}")
private String topic;
public void send(String content) {
MessagePayload payload = new MessagePayload(
UUID.randomUUID().toString(),
content,
System.currentTimeMillis()
);
//序列化
String jsonStr = JSONUtil.toJsonStr(payload);
kafka.send(topic, jsonStr);
}
}
@RestController
@RequestMapping("/provider")
public class ProviderController {
@Autowired private KafkaProducerService producer;
@GetMapping("/done") public String done() { producer.send("done"); return "done"; }
}
5.4 Provider(創(chuàng)建 Topic)
@Configuration
public class KafkaTopicsConfig {
@Value("${app.kafka.topic}")
private String topic;
// 只有當(dāng) app.kafka.auto-create-topic=true(或缺省并 matchIfMissing=true)才注冊 NewTopic
@Bean
@ConditionalOnProperty(name = "app.kafka.auto-create-topic", havingValue = "true", matchIfMissing = true)
public NewTopic demoTopic() {
// 分區(qū)/副本按你的集群實際調(diào)整;單 Broker 可用 (3,1)
return new NewTopic(topic, 3, (short) 1);
}
}
?? 有了它,就不用手動 kafka-topics.sh --create,Spring Boot 啟動時就能幫你“先起鍋燒水”。
5.5 Consumer(開吃)
@Slf4j
@Component
public class KafkaConsumerListener {
@KafkaListener(
topics = "${app.kafka.topic}",
groupId = "${spring.kafka.consumer.group-id}"
)
public void onMessage(String msg) {
try {
// json反序列化成對象
MessagePayload payload = JSON.parseObject(msg, MessagePayload.class);
log.info("? received: id={}, content={}, ts={}",
payload.id(), payload.content(), payload.ts());
} catch (Exception e) {
log.error("? JSON解析失敗,原始消息: {}", msg, e);
}
}
}
6. 運行流程
- 點火:Kafka Broker 先啟動
- 開店:先跑 consumer,再跑 provider
- 點單:
GET http://localhost:1003/provider/done - 吃菜:consumer 日志里出現(xiàn) ?? → 成功!
7. 常見坑
- 鍋點不著:
bootstrap-servers不通,先查網(wǎng)絡(luò) - 沒菜:topic 不存在?開
auto-create-topic - 吃不到:改
group-id或加auto-offset-reset=earliest - 串味了:序列化不匹配 Producer 默認用
StringSerializer,Consumer 卻用JsonDeserializer,兩邊火候不對,消息就“夾生”了。- 建議:
- 如果傳字符串,就都用
StringSerializer/StringDeserializer。 - 如果傳對象,就統(tǒng)一用
JsonSerializer/JsonDeserializer,并在application.yaml里顯式聲明spring.json.value.default.type。
- 如果傳字符串,就都用
- 建議:
8. 總結(jié)
Spring Boot + Kafka 的套路:
?? Provider 點單,Kafka 傳菜,Consumer 開吃。
到此這篇關(guān)于SpringBoot3+Kafka實戰(zhàn)指南的文章就介紹到這了,更多相關(guān)SpringBoot3 Kafka實戰(zhàn)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解SpringBoot項目整合Vue做一個完整的用戶注冊功能
本文主要介紹了SpringBoot項目整合Vue做一個完整的用戶注冊功能,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07
SSH框架網(wǎng)上商城項目第14戰(zhàn)之商城首頁UI的設(shè)計
這篇文章主要為大家詳細介紹了SSH框架網(wǎng)上商城項目第14戰(zhàn)之商城首頁UI的設(shè)計,感興趣的小伙伴們可以參考一下2016-06-06
MyBatis-Plus QueryWrapper及LambdaQueryWrapper的使用詳解
這篇文章主要介紹了MyBatis-Plus QueryWrapper及LambdaQueryWrapper的使用詳解,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-03-03
java微信企業(yè)號開發(fā)之開發(fā)模式的開啟
這篇文章主要為大家詳細介紹了java微信企業(yè)號開發(fā)之開發(fā)模式的開啟方法,感興趣的小伙伴們可以參考一下2016-06-06
Spring?AI?+?ollama?本地搭建聊天?AI?功能
這篇文章主要介紹了Spring?AI?+?ollama?本地搭建聊天?AI?,本文通過實例圖文相結(jié)合給大家講解的非常詳細,需要的朋友可以參考下2024-11-11
Spring Boot2配置Swagger2生成API接口文檔詳情
這篇文章主要介紹了Spring Boot2配置Swagger2生成API接口文檔詳情,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-09-09

