使用spring boot 整合kafka,延遲啟動(dòng)消費(fèi)者
spring boot 整合kafka,延遲啟動(dòng)消費(fèi)者
spring boot整合kafka的時(shí)候一般使用@KafkaListener來設(shè)置消費(fèi)者,但是這種方式在spring啟動(dòng)的時(shí)候就會(huì)立即開啟消費(fèi)者。如果有需要根據(jù)配置信息延遲開啟指定的消費(fèi)者就不能使用這種方式。
參考了類:KafkaListenerAnnotationBeanPostProcessor,我提取了一部分代碼。可以根據(jù)需要隨時(shí)動(dòng)態(tài)的開啟消費(fèi)者。還可以很方便的啟動(dòng)多個(gè)消費(fèi)者。
為了方便使用,我自定義了一個(gè)注解:
import org.springframework.kafka.annotation.TopicPartition;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface DelayKafkaConsumer {
String id() default "";
String[] topics() default {};
String errorHandler() default "";
String groupId() default "";
TopicPartition[] topicPartitions() default {};
String beanRef() default "__listener";
}
配合注解使用的factory:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.config.*;
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.convert.converter.GenericConverter;
import org.springframework.format.Formatter;
import org.springframework.format.FormatterRegistry;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.handler.annotation.support.*;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
@Service
public class MyKafkaConsumerFactory implements KafkaListenerConfigurer,BeanFactoryAware {
private static final Logger logger = LoggerFactory.getLogger(MyKafkaConsumerFactory.class);
private KafkaListenerEndpointRegistrar kafkaListenerEndpointRegistrar;
private final AtomicInteger counter = new AtomicInteger();
private BeanFactory beanFactory;
private BeanExpressionResolver resolver = new StandardBeanExpressionResolver();
private BeanExpressionContext expressionContext;
private final ListenerScope listenerScope = new ListenerScope();
private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory =
new KafkaHandlerMethodFactoryAdapter();
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
this.kafkaListenerEndpointRegistrar = registrar;
addFormatters(messageHandlerMethodFactory.defaultFormattingConversionService);
}
public void startConsumer(KafkaListenerEndpoint endpoint){
kafkaListenerEndpointRegistrar.registerEndpoint(endpoint);
}
public void startConsumer(Object target){
logger.info("start consumer {} ...",target.getClass());
Class<?> targetClass = AopUtils.getTargetClass(target);
Map<Method, Set<DelayKafkaConsumer>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
new MethodIntrospector.MetadataLookup<Set<DelayKafkaConsumer>>() {
@Override
public Set<DelayKafkaConsumer> inspect(Method method) {
Set<DelayKafkaConsumer> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
}
});
if (annotatedMethods.size()==0)
throw new IllegalArgumentException(target.getClass()+" need have method with @DelayKafkaConsumer");
for (Map.Entry<Method, Set<DelayKafkaConsumer>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
logger.info("find message listen handler method : {} , object : {}",method.getName(),target.getClass());
for (DelayKafkaConsumer listener : entry.getValue()) {
if(listener.topics().length==0) {
logger.info("topics value is empty , will skip it , method : {} , target object : {}",method.getName(),target.getClass());
continue;
}
processKafkaListener(listener,method,target);
logger.info("register method {} success , target object : {}",method.getName(),target.getClass());
}
}
logger.info("{} consumer start complete .",target.getClass());
}
protected void processKafkaListener(DelayKafkaConsumer kafkaListener, Method method, Object bean) {
Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint();
endpoint.setMethod(methodToUse);
endpoint.setBeanFactory(this.beanFactory);
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
if (StringUtils.hasText(errorHandlerBeanName)) {
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
}
processListener(endpoint, kafkaListener, bean, methodToUse);
}
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, DelayKafkaConsumer kafkaListener, Object bean,
Object adminTarget) {
String beanRef = kafkaListener.beanRef();
if (StringUtils.hasText(beanRef)) {
this.listenerScope.addListener(beanRef, bean);
}
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(kafkaListener));
endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
endpoint.setTopics(resolveTopics(kafkaListener));
endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
kafkaListenerEndpointRegistrar.registerEndpoint(endpoint);
if (StringUtils.hasText(beanRef)) {
this.listenerScope.removeListener(beanRef);
}
}
private String getEndpointId(DelayKafkaConsumer kafkaListener) {
if (StringUtils.hasText(kafkaListener.id())) {
return resolve(kafkaListener.id());
}
else {
return "Custom-Consumer" + this.counter.getAndIncrement();
}
}
private String getEndpointGroupId(DelayKafkaConsumer kafkaListener, String id) {
String groupId = null;
if (StringUtils.hasText(kafkaListener.groupId())) {
groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId");
}
if (groupId == null && StringUtils.hasText(kafkaListener.id())) {
groupId = id;
}
return groupId;
}
private String[] resolveTopics(DelayKafkaConsumer kafkaListener) {
String[] topics = kafkaListener.topics();
List<String> result = new ArrayList<>();
if (topics.length > 0) {
for (int i = 0; i < topics.length; i++) {
Object topic = resolveExpression(topics[i]);
resolveAsString(topic, result);
}
}
return result.toArray(new String[result.size()]);
}
private void resolveAsString(Object resolvedValue, List<String> result) {
if (resolvedValue instanceof String[]) {
for (Object object : (String[]) resolvedValue) {
resolveAsString(object, result);
}
}
else if (resolvedValue instanceof String) {
result.add((String) resolvedValue);
}
else if (resolvedValue instanceof Iterable) {
for (Object object : (Iterable<Object>) resolvedValue) {
resolveAsString(object, result);
}
}
else {
throw new IllegalArgumentException(String.format(
"@DelayKafkaConsumer can't resolve '%s' as a String", resolvedValue));
}
}
private TopicPartitionInitialOffset[] resolveTopicPartitions(DelayKafkaConsumer kafkaListener) {
TopicPartition[] topicPartitions = kafkaListener.topicPartitions();
List<TopicPartitionInitialOffset> result = new ArrayList<>();
if (topicPartitions.length > 0) {
for (TopicPartition topicPartition : topicPartitions) {
result.addAll(resolveTopicPartitionsList(topicPartition));
}
}
return result.toArray(new TopicPartitionInitialOffset[result.size()]);
}
private List<TopicPartitionInitialOffset> resolveTopicPartitionsList(TopicPartition topicPartition) {
Object topic = resolveExpression(topicPartition.topic());
Assert.state(topic instanceof String,
"topic in @TopicPartition must resolve to a String, not " + topic.getClass());
Assert.state(StringUtils.hasText((String) topic), "topic in @TopicPartition must not be empty");
String[] partitions = topicPartition.partitions();
PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets();
Assert.state(partitions.length > 0 || partitionOffsets.length > 0,
"At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");
List<TopicPartitionInitialOffset> result = new ArrayList<>();
for (int i = 0; i < partitions.length; i++) {
resolvePartitionAsInteger((String) topic, resolveExpression(partitions[i]), result);
}
for (PartitionOffset partitionOffset : partitionOffsets) {
Object partitionValue = resolveExpression(partitionOffset.partition());
Integer partition;
if (partitionValue instanceof String) {
Assert.state(StringUtils.hasText((String) partitionValue),
"partition in @PartitionOffset for topic '" + topic + "' cannot be empty");
partition = Integer.valueOf((String) partitionValue);
}
else if (partitionValue instanceof Integer) {
partition = (Integer) partitionValue;
}
else {
throw new IllegalArgumentException(String.format(
"@PartitionOffset for topic '%s' can't resolve '%s' as an Integer or String, resolved to '%s'",
topic, partitionOffset.partition(), partitionValue.getClass()));
}
Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());
Long initialOffset;
if (initialOffsetValue instanceof String) {
Assert.state(StringUtils.hasText((String) initialOffsetValue),
"'initialOffset' in @PartitionOffset for topic '" + topic + "' cannot be empty");
initialOffset = Long.valueOf((String) initialOffsetValue);
}
else if (initialOffsetValue instanceof Long) {
initialOffset = (Long) initialOffsetValue;
}
else {
throw new IllegalArgumentException(String.format(
"@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'",
topic, partitionOffset.initialOffset(), initialOffsetValue.getClass()));
}
Object relativeToCurrentValue = resolveExpression(partitionOffset.relativeToCurrent());
Boolean relativeToCurrent;
if (relativeToCurrentValue instanceof String) {
relativeToCurrent = Boolean.valueOf((String) relativeToCurrentValue);
}
else if (relativeToCurrentValue instanceof Boolean) {
relativeToCurrent = (Boolean) relativeToCurrentValue;
}
else {
throw new IllegalArgumentException(String.format(
"@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'",
topic, partitionOffset.relativeToCurrent(), relativeToCurrentValue.getClass()));
}
TopicPartitionInitialOffset topicPartitionOffset =
new TopicPartitionInitialOffset((String) topic, partition, initialOffset, relativeToCurrent);
if (!result.contains(topicPartitionOffset)) {
result.add(topicPartitionOffset);
}
else {
throw new IllegalArgumentException(
String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
topicPartitionOffset));
}
}
return result;
}
private void resolvePartitionAsInteger(String topic, Object resolvedValue,
List<TopicPartitionInitialOffset> result) {
if (resolvedValue instanceof String[]) {
for (Object object : (String[]) resolvedValue) {
resolvePartitionAsInteger(topic, object, result);
}
}
else if (resolvedValue instanceof String) {
Assert.state(StringUtils.hasText((String) resolvedValue),
"partition in @TopicPartition for topic '" + topic + "' cannot be empty");
result.add(new TopicPartitionInitialOffset(topic, Integer.valueOf((String) resolvedValue)));
}
else if (resolvedValue instanceof Integer[]) {
for (Integer partition : (Integer[]) resolvedValue) {
result.add(new TopicPartitionInitialOffset(topic, partition));
}
}
else if (resolvedValue instanceof Integer) {
result.add(new TopicPartitionInitialOffset(topic, (Integer) resolvedValue));
}
else if (resolvedValue instanceof Iterable) {
for (Object object : (Iterable<Object>) resolvedValue) {
resolvePartitionAsInteger(topic, object, result);
}
}
else {
throw new IllegalArgumentException(String.format(
"@DelayKafkaConsumer for topic '%s' can't resolve '%s' as an Integer or String", topic, resolvedValue));
}
}
private Set<DelayKafkaConsumer> findListenerAnnotations(Method method) {
Set<DelayKafkaConsumer> listeners = new HashSet<>();
DelayKafkaConsumer ann = AnnotationUtils.findAnnotation(method, DelayKafkaConsumer.class);
if (ann != null) {
listeners.add(ann);
}
return listeners;
}
private Method checkProxy(Method methodArg, Object bean) {
Method method = methodArg;
if (AopUtils.isJdkDynamicProxy(bean)) {
try {
method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
for (Class<?> iface : proxiedInterfaces) {
try {
method = iface.getMethod(method.getName(), method.getParameterTypes());
break;
}
catch (NoSuchMethodException noMethod) {
}
}
}
catch (SecurityException ex) {
ReflectionUtils.handleReflectionException(ex);
}
catch (NoSuchMethodException ex) {
throw new IllegalStateException(String.format(
"target method '%s' found on bean target class '%s', " +
"but not found in any interface(s) for bean JDK proxy. Either " +
"pull the method up to an interface or switch to subclass (CGLIB) " +
"proxies by setting proxy-target-class/proxyTargetClass " +
"attribute to 'true'", method.getName(), method.getDeclaringClass().getSimpleName()), ex);
}
}
return method;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
if (beanFactory instanceof ConfigurableListableBeanFactory) {
this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
this.listenerScope);
}
}
private String resolveExpressionAsString(String value, String attribute) {
Object resolved = resolveExpression(value);
if (resolved instanceof String) {
return (String) resolved;
}
else {
throw new IllegalStateException("The [" + attribute + "] must resolve to a String. "
+ "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
}
}
private Object resolveExpression(String value) {
String resolvedValue = resolve(value);
return this.resolver.evaluate(resolvedValue, this.expressionContext);
}
/**
* Resolve the specified value if possible.
* @param value the value to resolve
* @return the resolved value
* @see ConfigurableBeanFactory#resolveEmbeddedValue
*/
private String resolve(String value) {
if (this.beanFactory instanceof ConfigurableBeanFactory) {
return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);
}
return value;
}
private void addFormatters(FormatterRegistry registry) {
for (Converter<?, ?> converter : getBeansOfType(Converter.class)) {
registry.addConverter(converter);
}
for (GenericConverter converter : getBeansOfType(GenericConverter.class)) {
registry.addConverter(converter);
}
for (org.springframework.format.Formatter<?> formatter : getBeansOfType(Formatter.class)) {
registry.addFormatter(formatter);
}
}
private <T> Collection<T> getBeansOfType(Class<T> type) {
if (this.beanFactory instanceof ListableBeanFactory) {
return ((ListableBeanFactory) this.beanFactory).getBeansOfType(type).values();
}else {
return Collections.emptySet();
}
}
private static class ListenerScope implements Scope {
private final Map<String, Object> listeners = new HashMap<>();
ListenerScope() {
super();
}
public void addListener(String key, Object bean) {
this.listeners.put(key, bean);
}
public void removeListener(String key) {
this.listeners.remove(key);
}
@Override
public Object get(String name, ObjectFactory<?> objectFactory) {
return this.listeners.get(name);
}
@Override
public Object remove(String name) {
return null;
}
@Override
public void registerDestructionCallback(String name, Runnable callback) {
}
@Override
public Object resolveContextualObject(String key) {
return this.listeners.get(key);
}
@Override
public String getConversationId() {
return null;
}
}
private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {
private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService();
private MessageHandlerMethodFactory messageHandlerMethodFactory;
public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {
this.messageHandlerMethodFactory = kafkaHandlerMethodFactory1;
}
@Override
public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {
return getMessageHandlerMethodFactory().createInvocableHandlerMethod(bean, method);
}
private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
if (this.messageHandlerMethodFactory == null) {
this.messageHandlerMethodFactory = createDefaultMessageHandlerMethodFactory();
}
return this.messageHandlerMethodFactory;
}
private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
defaultFactory.setBeanFactory(MyKafkaConsumerFactory.this.beanFactory);
ConfigurableBeanFactory cbf =
(MyKafkaConsumerFactory.this.beanFactory instanceof ConfigurableBeanFactory ?
(ConfigurableBeanFactory) MyKafkaConsumerFactory.this.beanFactory : null);
defaultFactory.setConversionService(this.defaultFormattingConversionService);
List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();
// Annotation-based argument resolution
argumentResolvers.add(new HeaderMethodArgumentResolver(this.defaultFormattingConversionService, cbf));
argumentResolvers.add(new HeadersMethodArgumentResolver());
// Type-based argument resolution
final GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService);
argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));
argumentResolvers.add(new PayloadArgumentResolver(messageConverter) {
@Override
protected boolean isEmptyPayload(Object payload) {
return payload == null || payload instanceof KafkaNull;
}
});
defaultFactory.setArgumentResolvers(argumentResolvers);
defaultFactory.afterPropertiesSet();
return defaultFactory;
}
}
}
通過startConsumer來啟動(dòng)一個(gè)消費(fèi)者(多次調(diào)用會(huì)啟動(dòng)多個(gè)消費(fèi)者)。target必須至少包含一個(gè)有@DelayKafkaConsumer注解的方法。這里類似@KafkaListener。我去掉了一部分功能,保留了比較常用的部分。
這里提供了一個(gè)通過注解的方式在spring boot項(xiàng)目中動(dòng)態(tài)控制consumer的方法。還有其他的方法來達(dá)到這種效果,不過我覺得這種方法比較方便。
java項(xiàng)目集成springboot使用kafka消費(fèi)者,啟動(dòng)失敗報(bào)錯(cuò) Failed to construct kafka consumer
之前博客里面提到本公司為物聯(lián)網(wǎng)項(xiàng)目。項(xiàng)目中使用mqtt+kafka進(jìn)行與設(shè)備端的通訊,之前的協(xié)議格式為json格式,現(xiàn)在改成字節(jié)數(shù)組byte[]格式進(jìn)行通信。
集成springboot后,具體的demo網(wǎng)上很多,接下來有時(shí)間會(huì)出一份kafka的demo。
報(bào)錯(cuò)信息如下:
Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';
nested exception is org.apache.kafka.common.KafkaException:Failed to construct kafka consumer
原因分析:
之前json格式通信時(shí)候,構(gòu)建kafka消費(fèi)工廠的時(shí)候,其中ConcurrentMessageListenerContainer的key為String類型,而value現(xiàn)在為byte[]類型,所以構(gòu)建消費(fèi)者工廠的時(shí)候需要指定正確的value類型。
代碼如下:
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaListenerContainerByteFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<String, byte[]>();
factory.setConsumerFactory(consumerByteFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
整體kafka生產(chǎn)者+kafka消費(fèi)者的demo會(huì)在接下來的博客中陸續(xù)整理。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java Socket通信(一)之客戶端程序 發(fā)送和接收數(shù)據(jù)
對于Socket通信簡述,服務(wù)端往Socket的輸出流里面寫東西,客戶端就可以通過Socket的輸入流讀取對應(yīng)的內(nèi)容,Socket與Socket之間是雙向連通的,所以客戶端也可以往對應(yīng)的Socket輸出流里面寫東西,然后服務(wù)端對應(yīng)的Socket的輸入流就可以讀出對應(yīng)的內(nèi)容2016-03-03
基于SpringCloud手寫一個(gè)簡易版Sentinel
SpringCloud Alibaba Sentinel是當(dāng)前最為流行一種熔斷降級框架,簡單易用的方式可以快速幫助我們實(shí)現(xiàn)服務(wù)的限流和降級,保證服務(wù)的穩(wěn)定性。2021-05-05
mybatis查詢返回Map<String,Object>類型的講解
這篇文章主要介紹了mybatis查詢返回Map<String,Object>類型的講解,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06
mybatis?resultMap沒有全部對應(yīng)的字段處理方式
這篇文章主要介紹了mybatis?resultMap沒有全部對應(yīng)的字段處理方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03
springboot 使用yml配置文件自定義屬性的操作代碼
在SpringBoot中yml/yaml文件可以自定義一些屬性,以供注入給自定義bean對象的屬性,主要通過空格和層次來實(shí)現(xiàn),類似于python代碼,本文通過實(shí)例代碼給大家介紹springboot 使用yml配置文件自定義屬性,感興趣的朋友跟隨小編一起看看吧2024-03-03

