Java中Springboot集成Kafka實現(xiàn)消息發(fā)送和接收功能
一、Kafka 簡介
Kafka 是由 Apache 軟件基金會開發(fā)的一個開源流處理平臺,最初由 LinkedIn 公司開發(fā),并于 2011 年開源。它是一種高吞吐量的分布式發(fā)布 - 訂閱消息系統(tǒng),以可持久化、高吞吐、低延遲、高容錯等特性而著稱。
Kafka 主要由生產(chǎn)者(Producer)、消費者(Consumer)、主題(Topic)、分區(qū)(Partition)和代理(Broker)等組件構(gòu)成。生產(chǎn)者負(fù)責(zé)將數(shù)據(jù)發(fā)送到 Kafka 集群,消費者從集群中讀取數(shù)據(jù)。主題是一種邏輯上的分類,數(shù)據(jù)被發(fā)送到特定的主題。每個主題又可以劃分為多個分區(qū),以實現(xiàn)數(shù)據(jù)的并行處理和提高系統(tǒng)的可擴展性。代理則是 Kafka 集群中的服務(wù)器節(jié)點,負(fù)責(zé)接收和存儲生產(chǎn)者發(fā)送的數(shù)據(jù),并為消費者提供數(shù)據(jù)讀取服務(wù)。
二、Kafka 功能
消息隊列功能:Kafka 可以作為消息隊列使用,在應(yīng)用程序之間傳遞消息。生產(chǎn)者將消息發(fā)送到主題,不同的消費者可以從主題中訂閱并消費消息,實現(xiàn)應(yīng)用程序解耦。例如,在電商系統(tǒng)中,訂單生成模塊可以將訂單消息發(fā)送到 Kafka 主題,后續(xù)的庫存管理、物流配送等模塊可以從該主題消費訂單消息,各自獨立處理,降低模塊間的耦合度。
數(shù)據(jù)存儲功能:Kafka 具有持久化存儲能力,它將消息數(shù)據(jù)存儲在磁盤上,并且通過多副本機制保證數(shù)據(jù)的可靠性。即使某個節(jié)點出現(xiàn)故障,數(shù)據(jù)也不會丟失。這種特性使得 Kafka 不僅可以作為消息隊列,還能用于數(shù)據(jù)的長期存儲和備份,例如用于存儲系統(tǒng)的操作日志,方便后續(xù)的數(shù)據(jù)分析和故障排查。
流處理功能:Kafka 可以與流處理框架(如 Apache Flink、Spark Streaming 等)集成,對實時數(shù)據(jù)流進行處理。通過將實時數(shù)據(jù)發(fā)送到 Kafka 主題,流處理框架可以從主題中讀取數(shù)據(jù)并進行實時計算、分析和轉(zhuǎn)換。例如,在實時監(jiān)控系統(tǒng)中,通過 Kafka 收集服務(wù)器的性能指標(biāo)數(shù)據(jù),然后使用流處理框架對這些數(shù)據(jù)進行實時分析,及時發(fā)現(xiàn)性能異常并發(fā)出警報。
三、POM依賴
<!-- kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.11</version> </dependency>
四、配置文件
spring: # Kafka 配置 kafka: # Kafka 服務(wù)器地址和端口 代理地址,可以多個 bootstrap-servers: IP:9092 # 生產(chǎn)者配置 producer: # 發(fā)送失敗時的重試次數(shù) retries: 3 # 每次批量發(fā)送消息的數(shù)量,調(diào)整為較小值 batch-size: 1 # 生產(chǎn)者緩沖區(qū)大小 buffer-memory: 33554432 # 消息 key 的序列化器,將 key 序列化為字節(jié)數(shù)組 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息 value 的序列化器,將消息體序列化為字節(jié)數(shù)組 value-serializer: org.apache.kafka.common.serialization.StringSerializer # 消費者配置 consumer: # 當(dāng)沒有初始偏移量或當(dāng)前偏移量不存在時,從最早的消息開始消費 auto-offset-reset: earliest # 是否自動提交偏移量 enable-auto-commit: true # 自動提交偏移量的時間間隔(毫秒),延長自動提交時間間隔 auto-commit-interval: 1000 # 消息 key 的反序列化器,將字節(jié)數(shù)組反序列化為 key key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 消息 value 的反序列化器,將字節(jié)數(shù)組反序列化為消息體 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
五、生產(chǎn)者
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; /** * 生產(chǎn)者 * * @author chenlei */ @Slf4j @Component public class KafkaProducer { /** * KafkaTemplate */ @Autowired private KafkaTemplate<String, String> kafkaTemplate; /** * 發(fā)送消息到指定的 Kafka 主題,并可指定分組信息 * * @param topic 消息要發(fā)送到的 Kafka 主題 * @param message 要發(fā)送的消息內(nèi)容 */ public void sendMessage(String topic, String message) { // 使用 KafkaTemplate 發(fā)送消息,將消息發(fā)送到指定的主題 ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { // 消息發(fā)送成功后的處理邏輯,可根據(jù)需要添加 log.info("已發(fā)送消息=[" + message + "],其偏移量=[" + result.getRecordMetadata().offset() + "]"); } @Override public void onFailure(Throwable ex) { // 消息發(fā)送失敗后的處理邏輯,使用日志記錄異常 log.error("發(fā)送消息=[" + message + "] 失敗", ex); } }); } }
六、消費者
import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * @author 消費者 * chenlei */ @Slf4j @Component public class KafkaConsumer { /** * 監(jiān)聽 Kafka 主題方法。 * * @param record 從 Kafka 接收到的 ConsumerRecord,包含消息的鍵值對 */ @KafkaListener(topics = {"topic"}, groupId = "consumer.group-id", concurrency = "5") public void listen(ConsumerRecord<?, ?> record) { // 打印接收到的消息的詳細(xì)信息 log.info("接收到 Kafka 消息: 主題 = {}, 分區(qū) = {}, 偏移量 = {}, 鍵 = {}, 值 = {}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } }
到此這篇關(guān)于Java中Springboot集成Kafka實現(xiàn)消息發(fā)送和接收的文章就介紹到這了,更多相關(guān)Springboot Kafka 消息發(fā)送和接收內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot?log4j2日志框架整合與使用過程解析
這篇文章主要介紹了springboot?log4j2日志框架整合與使用,包括引入maven依賴和添加配置文件log4j2-spring.xml的相關(guān)知識,需要的朋友可以參考下2022-05-05Java從數(shù)據(jù)庫中讀取Blob對象圖片并顯示的方法
這篇文章主要介紹了Java從數(shù)據(jù)庫中讀取Blob對象圖片并顯示的方法,實例分析了Java讀取數(shù)據(jù)庫中Blob對象圖片的技巧與操作方法,需要的朋友可以參考下2015-02-02詳解Spring Cloud Gateway 數(shù)據(jù)庫存儲路由信息的擴展方案
這篇文章主要介紹了詳解Spring Cloud Gateway 數(shù)據(jù)庫存儲路由信息的擴展方案,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-11-11SpringBoot接入ftp/ftps并上傳文件和配置的代碼指南
接入ftp服務(wù)器,在springboot上實現(xiàn)起來也不算復(fù)雜,本文主要講下如何在springboot下接入ftp服務(wù)上傳文件,并對出現(xiàn)的問題做一些記錄,ftp服務(wù)的參數(shù)配置等,需要的朋友可以參考下2024-12-12