springboot如何開啟和關(guān)閉kafka消費
更新時間:2024年12月16日 09:44:17 作者:阿拉的夢想
在Kafka消費者中,通過關(guān)閉自動消費配置,使用自定義容器工廠,并在消費監(jiān)聽器上設(shè)置id,可以手動控制消費的開啟和關(guān)閉,這是根據(jù)個人經(jīng)驗總結(jié)的方法,旨在幫助其他開發(fā)者
springboot開啟和關(guān)閉kafka消費
關(guān)閉kafka自動消費
配置自定義容器工廠
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Component;
@Component
@Configuration
public class kafkaConfig {
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Bean("pingKafkaFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> delayContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<String, String>();
container.setConsumerFactory(consumerFactory);
//禁止自動啟動
container.setAutoStartup(false);
return container;
}
}
在消費監(jiān)聽器上使用工廠,并設(shè)置id
@KafkaListener(topics = "#{pingProperties.getTopic().split(',')}",id = "pingConsumer",containerFactory = "pingKafkaFactory")這樣,啟動項目后,就不會自動消費了。
手動開啟和關(guān)閉消費
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Service;
/**
* Kafka消費監(jiān)聽服務(wù)實現(xiàn)類.
*/
@Service
@Slf4j
public class KafkaConsumerListenerServiceImpl implements KafkaConsumerListenerService {
/**
* registry.
*/
@Autowired
private KafkaListenerEndpointRegistry registry;
/**
* 開啟監(jiān)聽.
*
* @param listenerId 監(jiān)聽ID
*/
@Override
public void startListener(String listenerId) {
//判斷監(jiān)聽容器是否啟動,未啟動則將其啟動
if (!registry.getListenerContainer(listenerId).isRunning()) {
registry.getListenerContainer(listenerId).start();
}
//項目啟動的時候監(jiān)聽容器是未啟動狀態(tài),而resume是恢復(fù)的意思不是啟動的意思
//registry.getListenerContainer(listenerId).stop();
log.info(listenerId + "開啟監(jiān)聽成功。");
}
/**
* 停止監(jiān)聽.
*
* @param listenerId 監(jiān)聽ID
*/
@Override
public void stopListener(String listenerId) {
registry.getListenerContainer(listenerId).stop();
log.info(listenerId + "停止監(jiān)聽成功。");
}
}總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
springboot執(zhí)行延時任務(wù)之DelayQueue實例
這篇文章主要介紹了springboot執(zhí)行延時任務(wù)之DelayQueue實例,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-02-02
mybatis-plus返回map自動轉(zhuǎn)駝峰配置操作
這篇文章主要介紹了mybatis-plus返回map自動轉(zhuǎn)駝峰配置操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-11-11

