spring 整合kafka監(jiān)聽消費(fèi)的配置過程
前言
最近項(xiàng)目里有個(gè)需求,要消費(fèi)kafka里的數(shù)據(jù)。之前也手動(dòng)寫過代碼去消費(fèi)kafka數(shù)據(jù)。但是轉(zhuǎn)念一想。既然spring提供了消費(fèi)kafka的方法。就沒必要再去重復(fù)造輪子。于是嘗試使用spring的API。
項(xiàng)目技術(shù)背景,使用springMVC,XML配置和注解相互使用。kafka的配置都是使用XML方式。
整合過程
1. 引入spring-kafka的依賴包
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency>
2. 在spring的xml文件里增加配置項(xiàng),也可以單獨(dú)創(chuàng)建一個(gè)spring-context-XX.xml文件。
<!-- consumer configuration 該配置項(xiàng)可以根據(jù)自己業(yè)務(wù)的實(shí)際需求做增加或刪除--> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" /> <entry key="group.id" value="group" /> <entry key="enable.auto.commit" value="true" /> <entry key="auto.commit.interval.ms" value="3000" /> <entry key="session.timeout.ms" value="10000" /> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> </map> </constructor-arg> </bean> <!-- create factory 該類是spring jar包里提供,就這么配置--> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties" /> </constructor-arg> </bean> <!-- 自定義的消費(fèi)類,需要實(shí)現(xiàn)spring的接口 --> <bean id="payPalConsumer" class="com.chao.service.consumer.PayPalConsumer" /> <!-- 該類也是jar包里提供的,注入的監(jiān)聽類是自己定義的,topic名稱是配置文件引入的--> <bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties"> <constructor-arg name="topics" value="${kafka.paypal.topic.name}"/> <property name="messageListener" ref="payPalConsumer" /> </bean> <!-- 改類也是jar里提供的,把這個(gè)containerProperties和consumerfactory 注入 --> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory" /> <constructor-arg ref="containerProperties" /> </bean>
2. 自定義消費(fèi)者類,消費(fèi)者類依然可以使用注解。
/** * get msg from kafka */ @Component public class PayPalConsumer implements MessageListener<String, String> { private static Logger logger = LoggerFactory.getLogger(PayPalConsumer.class); @Autowired private XXService XXService; @Override public void onMessage(ConsumerRecord<String, String> authorizeRecord) { String value = authorizeRecord.value(); if (StringUtils.isEmpty(value)){ logger.warn("receive message from kafka is null"); return; } logger.info("receive message from kafka is {}",value); } }
使用這個(gè)步驟配置,一次性過。非常順利。
到此這篇關(guān)于spring 整合kafka監(jiān)聽消費(fèi)的配置過程的文章就介紹到這了,更多相關(guān)spring 整合kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis返回類型為Map時(shí)遇到的類型轉(zhuǎn)化的異常問題
這篇文章主要介紹了Mybatis返回類型為Map時(shí)遇到的類型轉(zhuǎn)化的異常問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12SpringBoot服務(wù)端數(shù)據(jù)校驗(yàn)過程詳解
這篇文章主要介紹了SpringBoot服務(wù)端數(shù)據(jù)校驗(yàn)過程詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02Spring-boot原理及spring-boot-starter實(shí)例和代碼
spring-boot的starter是一個(gè)通過maven完成自包含并通過annotation配置使得可被spring上下文發(fā)現(xiàn)并實(shí)例化的一個(gè)可插拔的組件或服務(wù)。這篇文章主要介紹了Spring-boot原理及spring-boot-starter實(shí)例和代碼 ,需要的朋友可以參考下2019-06-06解決子線程無法訪問父線程中通過ThreadLocal設(shè)置的變量問題
這篇文章主要介紹了解決子線程無法訪問父線程中通過ThreadLocal設(shè)置的變量問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-07-07Java中Scanner的常用方法總結(jié)(一次學(xué)懂)
這篇文章主要給大家介紹了關(guān)于Java中Scanner常用方法的相關(guān)資料,Java中的Scanner是一個(gè)用于讀取用戶輸入的類,它可以讀取各種類型的數(shù)據(jù),包括整數(shù)、浮點(diǎn)數(shù)、字符串等等,需要的朋友可以參考下2023-11-11Java8使用stream實(shí)現(xiàn)list中對(duì)象屬性的合并(去重并求和)
這篇文章主要介紹了Java8使用stream實(shí)現(xiàn)list中對(duì)象屬性的合并(去重并求和),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01java中調(diào)用https請(qǐng)求忽略ssl證書認(rèn)證代碼示例
在網(wǎng)絡(luò)請(qǐng)求中經(jīng)常會(huì)遇到需要忽略證書認(rèn)證的情況,這篇文章主要介紹了java中調(diào)用https請(qǐng)求忽略ssl證書認(rèn)證的相關(guān)資料,文中通過代碼示例介紹的非常詳細(xì),需要的朋友可以參考下2024-10-10