springboot如何開啟和關(guān)閉kafka消費(fèi)
更新時(shí)間:2024年12月16日 09:44:17 作者:阿拉的夢(mèng)想
在Kafka消費(fèi)者中,通過關(guān)閉自動(dòng)消費(fèi)配置,使用自定義容器工廠,并在消費(fèi)監(jiān)聽器上設(shè)置id,可以手動(dòng)控制消費(fèi)的開啟和關(guān)閉,這是根據(jù)個(gè)人經(jīng)驗(yàn)總結(jié)的方法,旨在幫助其他開發(fā)者
springboot開啟和關(guān)閉kafka消費(fèi)
關(guān)閉kafka自動(dòng)消費(fèi)
配置自定義容器工廠
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Component;
@Component
@Configuration
public class kafkaConfig {
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Bean("pingKafkaFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> delayContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<String, String>();
container.setConsumerFactory(consumerFactory);
//禁止自動(dòng)啟動(dòng)
container.setAutoStartup(false);
return container;
}
}
在消費(fèi)監(jiān)聽器上使用工廠,并設(shè)置id
@KafkaListener(topics = "#{pingProperties.getTopic().split(',')}",id = "pingConsumer",containerFactory = "pingKafkaFactory")這樣,啟動(dòng)項(xiàng)目后,就不會(huì)自動(dòng)消費(fèi)了。
手動(dòng)開啟和關(guān)閉消費(fèi)
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Service;
/**
* Kafka消費(fèi)監(jiān)聽服務(wù)實(shí)現(xiàn)類.
*/
@Service
@Slf4j
public class KafkaConsumerListenerServiceImpl implements KafkaConsumerListenerService {
/**
* registry.
*/
@Autowired
private KafkaListenerEndpointRegistry registry;
/**
* 開啟監(jiān)聽.
*
* @param listenerId 監(jiān)聽ID
*/
@Override
public void startListener(String listenerId) {
//判斷監(jiān)聽容器是否啟動(dòng),未啟動(dòng)則將其啟動(dòng)
if (!registry.getListenerContainer(listenerId).isRunning()) {
registry.getListenerContainer(listenerId).start();
}
//項(xiàng)目啟動(dòng)的時(shí)候監(jiān)聽容器是未啟動(dòng)狀態(tài),而resume是恢復(fù)的意思不是啟動(dòng)的意思
//registry.getListenerContainer(listenerId).stop();
log.info(listenerId + "開啟監(jiān)聽成功。");
}
/**
* 停止監(jiān)聽.
*
* @param listenerId 監(jiān)聽ID
*/
@Override
public void stopListener(String listenerId) {
registry.getListenerContainer(listenerId).stop();
log.info(listenerId + "停止監(jiān)聽成功。");
}
}總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
您可能感興趣的文章:
- Springboot項(xiàng)目消費(fèi)Kafka數(shù)據(jù)的方法
- springboot配置kafka批量消費(fèi),并發(fā)消費(fèi)方式
- SpringBoot整合Kafka完成生產(chǎn)消費(fèi)的方案
- spring-kafka使消費(fèi)者動(dòng)態(tài)訂閱新增的topic問題
- springboot集成kafka消費(fèi)手動(dòng)啟動(dòng)停止操作
- Springboot集成Kafka進(jìn)行批量消費(fèi)及踩坑點(diǎn)
- spring 整合kafka監(jiān)聽消費(fèi)的配置過程
- spring-kafka消費(fèi)異常處理
相關(guān)文章
springboot執(zhí)行延時(shí)任務(wù)之DelayQueue實(shí)例
這篇文章主要介紹了springboot執(zhí)行延時(shí)任務(wù)之DelayQueue實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-02-02
java 字符串詞頻統(tǒng)計(jì)實(shí)例代碼
java 字符串詞頻統(tǒng)計(jì)實(shí)例代碼,需要的朋友可以參考一下2013-03-03
使用Java實(shí)現(xiàn)價(jià)格加密與優(yōu)化功能
在現(xiàn)代軟件開發(fā)中,數(shù)據(jù)加密是一個(gè)非常重要的環(huán)節(jié),尤其是在處理敏感信息(如價(jià)格、用戶數(shù)據(jù)等)時(shí),本文將詳細(xì)介紹如何使用?Java?實(shí)現(xiàn)價(jià)格加密,并對(duì)代碼進(jìn)行優(yōu)化,需要的朋友可以參考下2025-01-01
mybatis-plus返回map自動(dòng)轉(zhuǎn)駝峰配置操作
這篇文章主要介紹了mybatis-plus返回map自動(dòng)轉(zhuǎn)駝峰配置操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-11-11

