springboot連接kafka集群的使用示例
一、環(huán)境搭建
1.1 springboot 環(huán)境
- JDK 11+
- Maven 3.8.x+
- springboot 2.5.4 +
1.2 kafka 依賴
springboot的pom文件導(dǎo)入
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency>
二、 kafka 配置類
2.1 發(fā)布者
2.1.1 配置
發(fā)布者我們使用 KafkaTemplate 來進(jìn)行消息發(fā)布,所以需要先對其進(jìn)行一些必要的配置。
@Configuration @EnableKafka public class KafkaConfig { /***** 發(fā)布者 *****/ //生產(chǎn)者工廠 @Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } //生產(chǎn)者配置 @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } //生產(chǎn)者模板 @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
2.1.2 構(gòu)建發(fā)布者類
配置完發(fā)布者,下來就是發(fā)布消息,我們需要繼承 ProducerListener<K, V> 接口,該接口完整信息如下:
public interface ProducerListener<K, V> { void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata); void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception); }
實(shí)現(xiàn)該接口的方法,我們可以獲取包含發(fā)送結(jié)果(成功或失?。┑漠惒交卣{(diào),也就是可以在這個(gè)接口的實(shí)現(xiàn)中獲取發(fā)送結(jié)果。
我們簡單的實(shí)現(xiàn)構(gòu)建一個(gè)發(fā)布者類,接收主題和發(fā)布消息參數(shù),并打印發(fā)布結(jié)果。
@Component public class KafkaProducer implements ProducerListener<Object,Object> { private static final Logger producerlog = LoggerFactory.getLogger(KafkaProducer.class); private final KafkaTemplate<Integer, String> kafkaTemplate; public KafkaProducer(KafkaTemplate<Integer, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void producer (String msg,String topic){ ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic,0, msg); future.addCallback(new KafkaSendCallback<Integer, String>() { @Override public void onSuccess(SendResult<Integer, String> result) { producerlog.info("發(fā)送成功 {}", result); } @Override public void onFailure(KafkaProducerException ex) { ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord(); producerlog.info("發(fā)送失敗 {}",failed); } }); } }
2.1.3 發(fā)布消息
寫一個(gè)controller類來測試我們構(gòu)建的發(fā)布者類,這個(gè)類中打印接收到的消息,來確保信息接收不出問題。
@RestController public class KafkaTestController { private static final Logger kafkaTestLog = LoggerFactory.getLogger(KafkaTestController.class); @Resource private KafkaProducer kafkaProducer; @GetMapping("/kafkaTest") public void kafkaTest(String msg,String topic){ kafkaProducer.producer(msg,topic); kafkaTestLog.info("接收到消息 {} {}",msg,topic); } }
一切準(zhǔn)備就緒,我們啟動程序利用postman來進(jìn)行簡單的測試。
進(jìn)行消息發(fā)布:
發(fā)布結(jié)果:
可以看到消息發(fā)送成功。
我們再看看kafka消費(fèi)者有沒有接收到消息:
看以看到,kakfa的消費(fèi)者也接收到了消息。
2.2 消費(fèi)者
2.2.1 配置
消息的接受有多種方式,我們這里選擇的是使用 @KafkaListener 注解來進(jìn)行消息接收。它的使用像下面這樣:
public class Listener { @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId") public void listen(String data) { ... } }
看起來不是太難吧,但使用這個(gè)注解,我們需要配置底層 ConcurrentMessageListenerContainer.kafkaListenerContainerFactor。
我們在原來的kafka配置類 KafkaConfig 中,繼續(xù)配置消費(fèi)者,大概就像下面這樣
@Configuration @EnableKafka public class KafkaConfig { /***** 發(fā)布者 *****/ //生產(chǎn)者工廠 @Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } //生產(chǎn)者配置 @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } //生產(chǎn)者模板 @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } /***** 消費(fèi)者 *****/ //容器監(jiān)聽工廠 @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } //消費(fèi)者工廠 @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } //消費(fèi)者配置 @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class); props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName()); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000); return props; } }
注意,要設(shè)置容器屬性必須使用getContainerProperties()工廠方法。它用作注入容器的實(shí)際屬性的模板
2.2.2 構(gòu)建消費(fèi)者類
配置好后,我們就可以使用這個(gè)注解了。這個(gè)注解的使用有多種方式:
1、用它來覆蓋容器工廠的concurrency和屬性
@KafkaListener(id = "myListener", topics = "myTopic", autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}") public void listen(String data) { ... }
2、可以使用顯式主題和分區(qū)(以及可選的初始偏移量)
@KafkaListener(id = "thing2", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0", "1" }), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) }) public void listen(ConsumerRecord<?, ?> record) { ... }
3、將初始偏移應(yīng)用于所有已分配的分區(qū)
@KafkaListener(id = "thing3", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0", "1" }, partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")) }) public void listen(ConsumerRecord<?, ?> record) { ... }
4、指定以逗號分隔的分區(qū)列表或分區(qū)范圍
@KafkaListener(id = "pp", autoStartup = "false", topicPartitions = @TopicPartition(topic = "topic1", partitions = "0-5, 7, 10-15")) public void process(String in) { ... }
5、可以向偵聽器提供Acknowledgment
@KafkaListener(id = "cat", topics = "myTopic", containerFactory = "kafkaManualAckListenerContainerFactory") public void listen(String data, Acknowledgment ack) { ... ack.acknowledge(); }
6、添加標(biāo)頭
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<String> list, @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys, @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) { ... }
我們這里寫一個(gè)簡單的,只用它來接受指定主題的數(shù)據(jù):
@Component public class KafkaConsumer { private static final Logger consumerlog = LoggerFactory.getLogger(KafkaConsumer.class); @KafkaListener(topicPartitions = @TopicPartition(topic = "kafka-topic-test", partitions = "0")) public void consumer (String data){ consumerlog.info("消費(fèi)者接收數(shù)據(jù) {}",data); } }
這里解釋一下,因?yàn)槲覀冞M(jìn)行了手動分配主題/分區(qū),所以 注解中g(shù)roup.id 可以為空。若要指定group.id請?jiān)谙M(fèi)者配置中加上props.put(ConsumerConfig.GROUP_ID_CONFIG, “bzt001”); 或在 @TopicPartition 注解后加上 groupId = “組id”
2.2.3 進(jìn)行消息消費(fèi)
繼續(xù)使用postman調(diào)用我們寫好的發(fā)布者發(fā)布消息,觀察控制臺的消費(fèi)者類是否有相關(guān)日志出現(xiàn)。
到此這篇關(guān)于springboot連接kafka集群的使用示例的文章就介紹到這了,更多相關(guān)springboot連接kafka集群內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Spring?Boot整合Kafka+SSE實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)展示
- Kafka的安裝及接入SpringBoot的詳細(xì)過程
- springboot使用@KafkaListener監(jiān)聽多個(gè)kafka配置實(shí)現(xiàn)
- Spring?Boot中KafkaListener的介紹、原理和使用方法案例詳解
- 基于SpringBoot?使用?Flink?收發(fā)Kafka消息的示例詳解
- springboot+kafka中@KafkaListener動態(tài)指定多個(gè)topic問題
- Spring Boot 集成 Kafka的詳細(xì)步驟
相關(guān)文章
Java算法之?dāng)?shù)組冒泡排序代碼實(shí)例講解
這篇文章主要介紹了Java算法之?dāng)?shù)組冒泡排序代碼實(shí)例講解,文中用代碼舉例講解的很清晰,有感興趣的同學(xué)可以研究下2021-03-03IDEA中application.properties的圖標(biāo)顯示不正常的問題及解決方法
這篇文章主要介紹了IDEA中application.properties的圖標(biāo)顯示不正常的問題及解決方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04Java基于NIO實(shí)現(xiàn)群聊系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java基于NIO實(shí)現(xiàn)群聊系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-11-11講解Java設(shè)計(jì)模式編程中的建造者模式與原型模式
這篇文章主要介紹了Java設(shè)計(jì)模式編程中的建造者模式與原型模式,設(shè)計(jì)模式有利于團(tuán)隊(duì)開發(fā)過程中的代碼維護(hù),需要的朋友可以參考下2016-02-02SpringBoot+Spring Security基于內(nèi)存用戶認(rèn)證的實(shí)現(xiàn)
本文介紹了SpringBoot+Spring Security基于內(nèi)存用戶認(rèn)證的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-11-11java創(chuàng)建txt文件并寫入內(nèi)容的方法代碼示例
這篇文章主要介紹了java創(chuàng)建txt文件并寫入內(nèi)容的兩種方法,分別是使用java.io.FileWriter和BufferedWriter,以及使用Java7的java.nio.file包中的Files和Path類,需要的朋友可以參考下2025-01-01