SpringBoot集成Kafka 配置工具類(lèi)的詳細(xì)代碼
spring-kafka 是基于 java版的 kafka client與spring的集成,提供了 KafkaTemplate,封裝了各種方法,方便操作,它封裝了apache的kafka-client,不需要再導(dǎo)入client依賴(lài)
<!-- kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
YML配置
kafka: #bootstrap-servers: server1:9092,server2:9093 #kafka開(kāi)發(fā)地址, #生產(chǎn)者配置 producer: # Kafka提供的序列化和反序列化類(lèi) key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化 value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 1 # 消息發(fā)送重試次數(shù) #acks = 0:設(shè)置成 表示 producer 完全不理睬 leader broker 端的處理結(jié)果。此時(shí)producer 發(fā)送消息后立即開(kāi)啟下 條消息的發(fā)送,根本不等待 leader broker 端返回結(jié)果 #acks= all 或者-1 :表示當(dāng)發(fā)送消息時(shí), leader broker 不僅會(huì)將消息寫(xiě)入本地日志,同時(shí)還會(huì)等待所有其他副本都成功寫(xiě)入它們各自的本地日志后,才發(fā)送響應(yīng)結(jié)果給,消息安全但是吞吐量會(huì)比較低。 #acks = 1:默認(rèn)的參數(shù)值。 producer 發(fā)送消息后 leader broker 僅將該消息寫(xiě)入本地日志,然后便發(fā)送響應(yīng)結(jié)果給producer ,而無(wú)須等待其他副本寫(xiě)入該消息。折中方案,只要leader一直活著消息就不會(huì)丟失,同時(shí)也保證了吞吐量 acks: 1 #應(yīng)答級(jí)別:多少個(gè)分區(qū)副本備份完成時(shí)向生產(chǎn)者發(fā)送ack確認(rèn)(可選0、1、all/-1) batch-size: 16384 #批量大小 properties: linger: ms: 0 #提交延遲 buffer-memory: 33554432 # 生產(chǎn)端緩沖區(qū)大小 # 消費(fèi)者配置 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 分組名稱(chēng) group-id: web enable-auto-commit: false #提交offset延時(shí)(接收到消息后多久提交offset) # auto-commit-interval: 1000ms #當(dāng)kafka中沒(méi)有初始o(jì)ffset或offset超出范圍時(shí)將自動(dòng)重置offset # earliest:重置為分區(qū)中最小的offset; # latest:重置為分區(qū)中最新的offset(消費(fèi)分區(qū)中新產(chǎn)生的數(shù)據(jù)); # none:只要有一個(gè)分區(qū)不存在已提交的offset,就拋出異常; auto-offset-reset: latest properties: #消費(fèi)會(huì)話(huà)超時(shí)時(shí)間(超過(guò)這個(gè)時(shí)間consumer沒(méi)有發(fā)送心跳,就會(huì)觸發(fā)rebalance操作) session.timeout.ms: 15000 #消費(fèi)請(qǐng)求超時(shí)時(shí)間 request.timeout.ms: 18000 #批量消費(fèi)每次最多消費(fèi)多少條消息 #每次拉取一條,一條條消費(fèi),當(dāng)然是具體業(yè)務(wù)狀況設(shè)置 max-poll-records: 1 # 指定心跳包發(fā)送頻率,即間隔多長(zhǎng)時(shí)間發(fā)送一次心跳包,優(yōu)化該值的設(shè)置可以減少Rebalance操作,默認(rèn)時(shí)間為3秒; heartbeat-interval: 6000 # 發(fā)出請(qǐng)求時(shí)傳遞給服務(wù)器的 ID。用于服務(wù)器端日志記錄 正常使用后解開(kāi)注釋?zhuān)蝗恢挥幸粋€(gè)節(jié)點(diǎn)會(huì)報(bào)錯(cuò) #client-id: mqtt listener: #消費(fèi)端監(jiān)聽(tīng)的topic不存在時(shí),項(xiàng)目啟動(dòng)會(huì)報(bào)錯(cuò)(關(guān)掉) missing-topics-fatal: false #設(shè)置消費(fèi)類(lèi)型 批量消費(fèi) batch,單條消費(fèi):single type: single #指定容器的線(xiàn)程數(shù),提高并發(fā)量 #concurrency: 3 #手動(dòng)提交偏移量 manual達(dá)到一定數(shù)據(jù)后批量提交 #ack-mode: manual ack-mode: MANUAL_IMMEDIATE #手動(dòng)確認(rèn)消息 # 認(rèn)證 #properties: #security: #protocol: SASL_PLAINTEXT #sasl: #mechanism: SCRAM-SHA-256 #jaas:config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
簡(jiǎn)單工具類(lèi),能滿(mǎn)足正常使用,主題是無(wú)法修改的
@Component @Slf4j public class KafkaUtils<K, V> { @Autowired private KafkaTemplate kafkaTemplate; @Value("${spring.kafka.bootstrap-servers}") String[] servers; /** * 獲取連接 * @return */ private Admin getAdmin() { Properties properties = new Properties(); properties.put("bootstrap.servers", servers); // 正式環(huán)境需要添加賬號(hào)密碼 return Admin.create(properties); } /** * 增加topic * * @param name 主題名字 * @param partition 分區(qū)數(shù)量 * @param replica 副本數(shù)量 * @date 2022-06-23 chens */ public R addTopic(String name, Integer partition, Integer replica) { Admin admin = getAdmin(); if (replica > servers.length) { return R.error("副本數(shù)量不允許超過(guò)Broker數(shù)量"); } try { NewTopic topic = new NewTopic(name, partition, Short.parseShort(replica.toString())); admin.createTopics(Collections.singleton(topic)); } finally { admin.close(); } return R.ok(); } /** * 刪除主題 * * @param names 主題名字集合 * @date 2022-06-23 chens */ public void deleteTopic(List<String> names) { Admin admin = getAdmin(); try { admin.deleteTopics(names); } finally { admin.close(); } } /** * 查詢(xún)所有主題 * * @date 2022-06-24 chens */ public Set<String> queryTopic() { Admin admin = getAdmin(); try { ListTopicsResult topics = admin.listTopics(); Set<String> set = topics.names().get(); return set; } catch (Exception e) { log.error("查詢(xún)主題錯(cuò)誤!"); } finally { admin.close(); } return null; } // 向所有分區(qū)發(fā)送消息 public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) { return kafkaTemplate.send(topic, data); } // 指定key發(fā)送消息,相同key保證消息在同一個(gè)分區(qū) public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) { return kafkaTemplate.send(topic, key, data); } // 指定分區(qū)和key發(fā)送。 public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) { return kafkaTemplate.send(topic, partition, key, data); } }
發(fā)送消息 使用異步
@GetMapping("/{topic}") public String test(@PathVariable String topic, @PathVariable Long index) throws ExecutionException, InterruptedException { ListenableFuture future = null; Chenshuang user = new Chenshuang(i, "陳爽", "123456", new Date()); String s = JSON.toJSONString(user); KafkaUtils utils = new KafkaUtils(); future = kafkaUtils.send(topic, s); // 異步回調(diào),同步get,會(huì)等待 不推薦同步! future.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { System.out.println("發(fā)送失敗"); } @Override public void onSuccess(Object result) { System.out.println("發(fā)送成功:" + result); } }); return "發(fā)送成功"; }
建立主題
如果broker端配置auto.create.topics.enable為true(默認(rèn)為true),當(dāng)收到客戶(hù)端的元數(shù)據(jù)請(qǐng)求時(shí)則會(huì)創(chuàng)建topic。
向一個(gè)不存在的主題發(fā)送和消費(fèi)都會(huì)創(chuàng)建一個(gè)新的主題,很多時(shí)候,非預(yù)期的創(chuàng)建主題,會(huì)導(dǎo)致很多意想不到的問(wèn)題,建議關(guān)掉該特性。
Topic主題用來(lái)區(qū)分不同類(lèi)型的消息,實(shí)際也就是適用于不同的業(yè)務(wù)場(chǎng)景,默認(rèn)消息保存一周時(shí)間;
同一個(gè)Topic主題下,默認(rèn)是一個(gè)partition分區(qū),也就是只能有一個(gè)消費(fèi)者來(lái)消費(fèi),如果想提升消費(fèi)能力,就需要增加分區(qū);
同一個(gè)Topic的多個(gè)分區(qū),可以有三種方式分派消息(key,value)到不同的分區(qū),指定分區(qū)、HASH路由、默認(rèn),同一個(gè)分區(qū)內(nèi)的消息ID唯一,并順序;
消費(fèi)者消費(fèi)partition分區(qū)內(nèi)的消息時(shí),是通過(guò)offsert來(lái)標(biāo)識(shí)消息的位置;
GroupId用來(lái)解決同一個(gè)Topic主題下重復(fù)消費(fèi)問(wèn)題,比如一條消費(fèi)需要多個(gè)消費(fèi)者接收到,就可以通過(guò)設(shè)置不同的GroupId實(shí)現(xiàn),
實(shí)際消息是存一份的,只是通過(guò)邏輯上設(shè)置標(biāo)識(shí)來(lái)區(qū)分,系統(tǒng)會(huì)記錄Topic主題下–》GroupId分組下–》partition分區(qū)下的offsert,來(lái)標(biāo)識(shí)是否消費(fèi)過(guò)。
發(fā)送消息的高可用—
集群模式,多副本方式實(shí)現(xiàn);一條消息的提交,可能通過(guò)設(shè)置acks標(biāo)識(shí)實(shí)現(xiàn)不同的可用性,=0時(shí),發(fā)送成功就OK;=1時(shí),master成功響應(yīng)才OK,=all時(shí),一半以上的響應(yīng)才OK(真正的高可用)
消費(fèi)消息的高可用—
可以關(guān)閉自動(dòng)標(biāo)識(shí)offsert模式,先拉取消息,消費(fèi)完成后,再去設(shè)置offsert位置,來(lái)解決消費(fèi)高可用
import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class KafkaTopic { // yml自定義主題,項(xiàng)目啟動(dòng)就創(chuàng)建, @Value("${spring.kafka.topic}") String topic; @Value("${spring.kafka.bootstrap-servers}") String[] server; /** * 項(xiàng)目啟動(dòng) 初始化主題,如果存在不會(huì)覆蓋主題的 */ @Bean public NewTopic batchTopic() { // 最大復(fù)制因子 <= 經(jīng)紀(jì)人broker數(shù)量. return new NewTopic(topic, 10, (short) server.length); } }
監(jiān)聽(tīng)類(lèi) ,一條消息,各分組內(nèi)的消費(fèi)者只有一個(gè)消費(fèi)者消費(fèi)一次,如果消息在1區(qū),指定分區(qū)1監(jiān)聽(tīng)也會(huì)消費(fèi)
也可以同個(gè)方法監(jiān)聽(tīng)不同的主題,指定位移監(jiān)聽(tīng)
同組會(huì)均勻消費(fèi),不同組會(huì)重復(fù)消費(fèi)。
1、單播模式,只有一個(gè)消費(fèi)者組
(1)topic只有1個(gè)partition,該組內(nèi)有多個(gè)消費(fèi)者時(shí),此時(shí)同一個(gè)partition內(nèi)的消息只能被該組中的一 個(gè)consumer消費(fèi)。當(dāng)消費(fèi)者數(shù)量多于partition數(shù)量時(shí),多余的消費(fèi)者是處于空閑狀態(tài)的,如圖1所示。topic,test只有一個(gè)partition,并且只有1個(gè)group,G1,該group內(nèi)有多個(gè)consumer,只能被其中一個(gè)消費(fèi)者消費(fèi),其他的處于空閑狀態(tài)。
圖一
(2)該topic有多個(gè)partition,該組內(nèi)有多個(gè)消費(fèi)者,比如test 有3個(gè)partition,該組內(nèi)有2個(gè)消費(fèi)者,那么可能就是C0對(duì)應(yīng)消費(fèi)p0,p1內(nèi)的數(shù)據(jù),c1對(duì)應(yīng)消費(fèi)p2的數(shù)據(jù);如果有3個(gè)消費(fèi)者,就是一個(gè)消費(fèi)者對(duì)應(yīng)消費(fèi)一個(gè)partition內(nèi)的數(shù)據(jù)了。圖解分別如圖2,圖3.這種模式在集群模式下使用是非常普遍的,比如我們可以起3個(gè)服務(wù),對(duì)應(yīng)的topic設(shè)置3個(gè)partiition,這樣就可以實(shí)現(xiàn)并行消費(fèi),大大提高處理消息的效率。
圖二
圖三
2、廣播模式,多個(gè)消費(fèi)者組
如果想實(shí)現(xiàn)廣播的模式就需要設(shè)置多個(gè)消費(fèi)者組,這樣當(dāng)一個(gè)消費(fèi)者組消費(fèi)完這個(gè)消息后,絲毫不影響其他組內(nèi)的消費(fèi)者進(jìn)行消費(fèi),這就是廣播的概念。
(1)多個(gè)消費(fèi)者組,1個(gè)partition
該topic內(nèi)的數(shù)據(jù)被多個(gè)消費(fèi)者組同時(shí)消費(fèi),當(dāng)某個(gè)消費(fèi)者組有多個(gè)消費(fèi)者時(shí)也只能被一個(gè)消費(fèi)者消費(fèi),如圖4所示:
圖四
(2)多個(gè)消費(fèi)者組,多個(gè)partition
該topic內(nèi)的數(shù)據(jù)可被多個(gè)消費(fèi)者組多次消費(fèi),在一個(gè)消費(fèi)者組內(nèi),每個(gè)消費(fèi)者又可對(duì)應(yīng)該topic內(nèi)的一個(gè)或者多個(gè)partition并行消費(fèi),如圖五:
注意: 消費(fèi)者的數(shù)量并不能決定一個(gè)topic的并行度。它是由分區(qū)的數(shù)目決定的。
再多的消費(fèi)者,分區(qū)數(shù)少,也是浪費(fèi)!
一個(gè)組的最大并行度將等于該主題的分區(qū)數(shù)。
@Component @Slf4j public class Consumer { // 監(jiān)聽(tīng)主題 分組a @KafkaListener(topics =("${spring.kafka.topic}") ,groupId = "a") public void getMessage(ConsumerRecord message, Acknowledgment ack) { //確認(rèn)收到消息 ack.acknowledge(); } // 監(jiān)聽(tīng)主題 分組a @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "a") public void getMessage2(ConsumerRecord message, Acknowledgment ack) { //確認(rèn)收到消息 ack.acknowledge(); } // 監(jiān)聽(tīng)主題 分組b @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "b") public void getMessage3(ConsumerRecord message, Acknowledgment ack) { //確認(rèn)收到消息//確認(rèn)收到消息 ack.acknowledge(); } // 監(jiān)聽(tīng)主題 分組b @KafkaListener(topics = ("${spring.kafka.topic}"),groupId = "b") public void getMessage4(ConsumerRecord message, Acknowledgment ack) { //確認(rèn)收到消息//確認(rèn)收到消息 ack.acknowledge(); } // 指定監(jiān)聽(tīng)分區(qū)1的消息 @KafkaListener(topicPartitions = {@TopicPartition(topic = ("${spring.kafka.topic}"),partitions = {"1"})}) public void getMessage5(ConsumerRecord message, Acknowledgment ack) { Long id = JSONObject.parseObject(message.value().toString()).getLong("id"); //確認(rèn)收到消息//確認(rèn)收到消息 ack.acknowledge(); } /** * @Title 指定topic、partition、offset消費(fèi) * @Description 同時(shí)監(jiān)聽(tīng)topic1和topic2,監(jiān)聽(tīng)topic1的0號(hào)分區(qū)、topic2的 "0號(hào)和1號(hào)" 分區(qū),指向1號(hào)分區(qū)的offset初始值為8 * 注意:topics和topicPartitions不能同時(shí)使用; **/ @KafkaListener(id = "c1",groupId = "c",topicPartitions = { @TopicPartition(topic = "t1", partitions = { "0" }), @TopicPartition(topic = "t2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))}) public void getMessage6(ConsumerRecord record,Acknowledgment ack) { //確認(rèn)收到消息 ack.acknowledge(); } /** * 批量消費(fèi)監(jiān)聽(tīng)goods變更消息 * yml配置listener:type 要改為batch * ymk配置consumer:max-poll-records: ??(每次拉取多少條數(shù)據(jù)消費(fèi)) * concurrency = "2" 啟動(dòng)多少線(xiàn)程執(zhí)行,應(yīng)小于等于broker數(shù)量,避免資源浪費(fèi) */ @KafkaListener(id="sync-modify-goods", topics = "${spring.kafka.topic}",concurrency = "4") public void getMessage7(List<ConsumerRecord<String, String>> records){ for (ConsumerRecord<String, String> msg:records) { GoodsChangeMsg changeMsg = null; try { changeMsg = JSONObject.parseObject(msg.value(), GoodsChangeMsg.class); syncGoodsProcessor.handle(changeMsg); }catch (Exception exception) { log.error("解析失敗{}", msg, exception); } } } }
到此這篇關(guān)于SpringBoot集成Kafka 配置工具類(lèi)的文章就介紹到這了,更多相關(guān)SpringBoot集成Kafka 配置工具類(lèi)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
PowerJob的HashedWheelTimer工作流程源碼解讀
這篇文章主要為大家介紹了PowerJob的HashedWheelTimer工作流程源碼解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01java騰訊AI人臉對(duì)比對(duì)接代碼實(shí)例
這篇文章主要介紹了java騰訊AI人臉對(duì)比對(duì)接,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03Java常用數(shù)字工具類(lèi) 大數(shù)乘法、加法、減法運(yùn)算(2)
這篇文章主要為大家詳細(xì)介紹了Java常用數(shù)字工具類(lèi),大數(shù)乘法、加法、減法運(yùn)算,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-05-052024版本IDEA創(chuàng)建Servlet模板的圖文教程
新版IDEA?2024.1.4中,用戶(hù)需要自行創(chuàng)建Servlet模板以解決Web項(xiàng)目無(wú)法通過(guò)右鍵創(chuàng)建Servlet的問(wèn)題,本文詳細(xì)介紹了添加ServletAnnotatedClass.java模板的步驟,幫助用戶(hù)快速配置并使用新的Servlet模板,需要的朋友可以參考下2024-10-10詳解如何利用jasypt實(shí)現(xiàn)配置文件加密
Jasypt?(Java?Simplified?Encryption)?是一個(gè)?java?庫(kù),它允許開(kāi)發(fā)人員以最小的成本將基本的加密功能添加到項(xiàng)目中,而無(wú)需深入了解密碼學(xué)的工作原理。本文將利用jasypt實(shí)現(xiàn)配置文件加密,感興趣的可以學(xué)習(xí)一下2022-07-07