springboot如何開啟和關(guān)閉kafka消費(fèi)
更新時間:2024年12月16日 09:44:17 作者:阿拉的夢想
在Kafka消費(fèi)者中,通過關(guān)閉自動消費(fèi)配置,使用自定義容器工廠,并在消費(fèi)監(jiān)聽器上設(shè)置id,可以手動控制消費(fèi)的開啟和關(guān)閉,這是根據(jù)個人經(jīng)驗(yàn)總結(jié)的方法,旨在幫助其他開發(fā)者
springboot開啟和關(guān)閉kafka消費(fèi)
關(guān)閉kafka自動消費(fèi)
配置自定義容器工廠
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.stereotype.Component; @Component @Configuration public class kafkaConfig { @Autowired private ConsumerFactory<String, String> consumerFactory; @Bean("pingKafkaFactory") public ConcurrentKafkaListenerContainerFactory<String, String> delayContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<String, String>(); container.setConsumerFactory(consumerFactory); //禁止自動啟動 container.setAutoStartup(false); return container; } }
在消費(fèi)監(jiān)聽器上使用工廠,并設(shè)置id
@KafkaListener(topics = "#{pingProperties.getTopic().split(',')}",id = "pingConsumer",containerFactory = "pingKafkaFactory")
這樣,啟動項(xiàng)目后,就不會自動消費(fèi)了。
手動開啟和關(guān)閉消費(fèi)
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.stereotype.Service; /** * Kafka消費(fèi)監(jiān)聽服務(wù)實(shí)現(xiàn)類. */ @Service @Slf4j public class KafkaConsumerListenerServiceImpl implements KafkaConsumerListenerService { /** * registry. */ @Autowired private KafkaListenerEndpointRegistry registry; /** * 開啟監(jiān)聽. * * @param listenerId 監(jiān)聽ID */ @Override public void startListener(String listenerId) { //判斷監(jiān)聽容器是否啟動,未啟動則將其啟動 if (!registry.getListenerContainer(listenerId).isRunning()) { registry.getListenerContainer(listenerId).start(); } //項(xiàng)目啟動的時候監(jiān)聽容器是未啟動狀態(tài),而resume是恢復(fù)的意思不是啟動的意思 //registry.getListenerContainer(listenerId).stop(); log.info(listenerId + "開啟監(jiān)聽成功。"); } /** * 停止監(jiān)聽. * * @param listenerId 監(jiān)聽ID */ @Override public void stopListener(String listenerId) { registry.getListenerContainer(listenerId).stop(); log.info(listenerId + "停止監(jiān)聽成功。"); } }
總結(jié)
以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
把Jar文件轉(zhuǎn)成exe安裝文件的實(shí)現(xiàn)方法
下面小編就為大家?guī)硪黄袹ar文件轉(zhuǎn)成exe安裝文件的實(shí)現(xiàn)方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-11-11詳解使用Spring Security進(jìn)行自動登錄驗(yàn)證
本篇文章主要介紹了詳解使用Spring Security進(jìn)行自動登錄驗(yàn)證,非常具有實(shí)用價值,需要的朋友可以參考下2017-09-09Springboot項(xiàng)目編寫測試單元完整步驟記錄
這篇文章主要介紹了如何使用JUnit編寫Spring?Boot項(xiàng)目中的測試單元,包括引入依賴、配置文件設(shè)置、啟動文件創(chuàng)建以及編寫測試類的步驟,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2025-03-03Java并發(fā)編程之常用的多線程實(shí)現(xiàn)方式分析
這篇文章主要介紹了Java并發(fā)編程之常用的多線程實(shí)現(xiàn)方式,結(jié)合實(shí)例形式分析了java并發(fā)編程中多線程的相關(guān)原理、實(shí)現(xiàn)方法與操作注意事項(xiàng),需要的朋友可以參考下2020-02-02