亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

SpringBoot實(shí)現(xiàn)kafka多源配置的示例代碼

 更新時(shí)間:2024年06月06日 10:42:53   作者:it噩夢(mèng)  
實(shí)際開發(fā)中,不同的topic可能來自不同的集群,所以就需要配置不同的kafka數(shù)據(jù)源,基于springboot自動(dòng)配置的思想,最終通過配置文件的配置,自動(dòng)生成生產(chǎn)者及消費(fèi)者的配置,本文介紹了SpringBoot實(shí)現(xiàn)kafka多源配置,需要的朋友可以參考下

背景

實(shí)際開發(fā)中,不同的topic可能來自不同的集群,所以就需要配置不同的kafka數(shù)據(jù)源,基于springboot自動(dòng)配置的思想,最終通過配置文件的配置,自動(dòng)生成生產(chǎn)者及消費(fèi)者的配置。

核心配置

自動(dòng)化配置類

import com.example.kafka.autoconfig.CustomKafkaDataSourceRegister;
import com.example.kafka.autoconfig.kafkaConsumerConfig;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.SmartInstantiationAwareBeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;

@EnableKafka
@Configuration(
        proxyBeanMethods = false
)
@ConditionalOnWebApplication
@EnableConfigurationProperties({kafkaConsumerConfig.class})
@Import({CustomKafkaDataSourceRegister.class})
public class MyKafkaAutoConfiguration implements BeanFactoryAware, SmartInstantiationAwareBeanPostProcessor {
    public MyKafkaAutoConfiguration() {
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        beanFactory.getBean(CustomKafkaDataSourceRegister.class);
    }
}

注冊(cè)生產(chǎn)者、消費(fèi)者核心bean到spring

public void afterPropertiesSet() {
        Map<String, ConsumerConfigWrapper> factories = kafkaConsumerConfig.getFactories();
        if (factories != null && !factories.isEmpty()) {
            factories.forEach((factoryName, consumerConfig) -> {
                KafkaProperties.Listener listener = consumerConfig.getListener();
                Integer concurrency = consumerConfig.getConcurrency();
                // 創(chuàng)建監(jiān)聽容器工廠
                ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = createKafkaListenerContainerFactory(consumerConfig.buildProperties(), listener, concurrency);
                // 注冊(cè)到容器
                if (!beanFactory.containsBean(factoryName)) {
                    beanFactory.registerSingleton(factoryName, containerFactory);
                }
            });
        }
        Map<String, KafkaProperties.Producer> templates = kafkaProducerConfig.getTemplates();
        if (!ObjectUtils.isEmpty(templates)) {
            templates.forEach((templateName, producerConfig) -> {
                //registerBean(beanFactory, templateName, KafkaTemplate.class, propertyValues);
                //注冊(cè)spring bean的兩種方式
                registerBeanWithConstructor(beanFactory, templateName, KafkaTemplate.class, producerFactoryValues(producerConfig.buildProperties()));
            });
        }
    }

配置spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.example.kafka.MyKafkaAutoConfiguration

yml配置

spring:
  kafka:
    multiple:
      consumer:
        factories:
          test-factory:
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            bootstrap-servers: 192.168.56.112:9092
            group-id: group_a
            concurrency: 25
            fetch-min-size: 1048576
            fetch-max-wait: 3000
            listener:
              type: batch
            properties:
              spring-json-trusted-packages: '*'
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        auto-offset-reset: latest
      producer:
        templates:
          test-template:
            bootstrap-servers: 192.168.56.112:9092
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer

使用

在這里插入圖片描述

在這里插入圖片描述

到此這篇關(guān)于SpringBoot實(shí)現(xiàn)kafka多源配置的示例代碼的文章就介紹到這了,更多相關(guān)SpringBoot kafka多源配置內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 淺談Java中ABA問題及避免

    淺談Java中ABA問題及避免

    這篇文章主要介紹了淺談Java中ABA問題及避免,具有一定借鑒價(jià)值,需要的朋友可以參考下
    2018-01-01
  • Java中常用時(shí)間的一些相關(guān)方法

    Java中常用時(shí)間的一些相關(guān)方法

    日期的使用多種多樣,但萬變不離其宗,下面這篇文章主要給大家介紹了關(guān)于Java中常用時(shí)間的一些相關(guān)方法,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2021-10-10
  • Mybatis返回單個(gè)實(shí)體或者返回List的實(shí)現(xiàn)

    Mybatis返回單個(gè)實(shí)體或者返回List的實(shí)現(xiàn)

    這篇文章主要介紹了Mybatis返回單個(gè)實(shí)體或者返回List的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-07-07
  • Java多線程實(shí)戰(zhàn)之交叉打印的兩種方法

    Java多線程實(shí)戰(zhàn)之交叉打印的兩種方法

    今天小編就為大家分享一篇關(guān)于Java多線程實(shí)戰(zhàn)之交叉打印的兩種方法,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧
    2019-02-02
  • java實(shí)現(xiàn)登錄之后抓取數(shù)據(jù)

    java實(shí)現(xiàn)登錄之后抓取數(shù)據(jù)

    這篇文章給大家分享了用JAVA實(shí)現(xiàn)在登陸以后抓取網(wǎng)站的數(shù)據(jù)的相關(guān)知識(shí),有興趣的朋友可以測(cè)試參考下。
    2018-07-07
  • SpringBoot如何根據(jù)目錄路徑生成接口的url路徑

    SpringBoot如何根據(jù)目錄路徑生成接口的url路徑

    這篇文章主要介紹了SpringBoot如何根據(jù)目錄路徑生成接口的url路徑,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • Java使用ChatGPT的API詳解

    Java使用ChatGPT的API詳解

    OpenAI API 幾乎可以應(yīng)用于任何涉及理解或生成自然語言或代碼的任務(wù)。本文將帶你們介紹Java如何使用ChatGPT的API,感興趣的同學(xué)可以跟著小編一起來學(xué)習(xí)
    2023-04-04
  • Spring Boot應(yīng)用開發(fā)初探與實(shí)例講解

    Spring Boot應(yīng)用開發(fā)初探與實(shí)例講解

    這篇文章主要介紹了Spring Boot應(yīng)用開發(fā)初探與實(shí)例講解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • SpringCloud中的Feign詳解

    SpringCloud中的Feign詳解

    這篇文章主要介紹了SpringCloud中的Feign詳解,Feign是一個(gè)聲明式的Web Service客戶端,以Java接口注解的方式調(diào)用Http請(qǐng)求,同時(shí)Feign整合了Ribbon和Hystrix,實(shí)現(xiàn)負(fù)載均衡與容斷功能,需要的朋友可以參考下
    2023-09-09
  • springboot項(xiàng)目docker分層構(gòu)建的配置方式

    springboot項(xiàng)目docker分層構(gòu)建的配置方式

    在使用dockerfile構(gòu)建springboot項(xiàng)目時(shí),速度較慢,用時(shí)比較長,為了加快構(gòu)建docker鏡像的速度,采用分層構(gòu)建的方式,這篇文章主要介紹了springboot項(xiàng)目docker分層構(gòu)建,需要的朋友可以參考下
    2024-03-03

最新評(píng)論