Java中的Kafka攔截器詳解
Kafka攔截器
Producer 攔截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于實(shí)現(xiàn) clients 端的定制化控制邏輯。
對(duì)于 producer 而言, interceptor 使得用戶在消息發(fā)送前以及 producer 回調(diào)邏輯前有機(jī)會(huì)對(duì)消息做一些定制化需求,比如修改消息等。
同時(shí), producer 允許用戶指定多個(gè) interceptor按序作用于同一條消息從而形成一個(gè)攔截鏈(interceptor chain)。
Intercetpor 的實(shí)現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:
(1) configure(configs) 獲取配置信息和初始化數(shù)據(jù)時(shí)調(diào)用。
(2) onSend(ProducerRecord): 該方法封裝進(jìn) KafkaProducer.send 方法中,即它運(yùn)行在用戶主線程中。 Producer 確保在 消息被序列化以及計(jì)算分區(qū)前調(diào)用該方法。 用戶可以在該方法中對(duì)消息做任何操作,但最好保證不要修改消息所屬的 topic 和分區(qū), 否則會(huì)影響目標(biāo)分區(qū)的計(jì)算。
(3) onAcknowledgement(RecordMetadata, Exception): 該方法會(huì)在消息從 RecordAccumulator 成功發(fā)送到 Kafka Broker 之后,或者在發(fā)送過程中失敗時(shí)調(diào)用。 并且通常都是在 producer 回調(diào)邏輯觸發(fā)之前。 onAcknowledgement 運(yùn)行在producer 的 IO 線程中,因此不要在該方法中放入很重的邏輯,否則會(huì)拖慢 producer 的消息發(fā)送效率。
(4) close: 關(guān)閉 interceptor,主要用于執(zhí)行一些資源清理工作如前所述, interceptor 可能被運(yùn)行在多個(gè)線程中,因此在具體實(shí)現(xiàn)時(shí)用戶需要自行確保線程安全。另外倘若指定了多個(gè)interceptor,則 producer 將按照指定順序調(diào)用它們,并僅僅是捕獲每個(gè) interceptor 可能拋出的異常記錄到錯(cuò)誤日志中而非在向上傳遞。這在使用過程中要特別留意。
代碼演示
/** * @author WGR * @create 2021/8/9 -- 20:10 */ public class CounterInterceptor implements ProducerInterceptor<String, String> { private int errorCounter = 0; private int successCounter = 0; @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // 統(tǒng)計(jì)成功和失敗的次數(shù) if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 保存結(jié)果 System.out.println("Successful sent: " + successCounter); System.out.println("Failed sent: " + errorCounter); } @Override public void configure(Map<String, ?> configs) { } } /** * @author WGR * @create 2021/8/9 -- 20:07 */ public class TimeInterceptor implements ProducerInterceptor<String, String> { @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { // 創(chuàng)建一個(gè)新的 record,把時(shí)間戳寫入消息體的最前部 return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value().toString()); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
把上面的攔截器加到producer 主程序
/** * @author WGR * @create 2021/8/9 -- 16:38 */ public class ProducerFastStart { //kafka集群地址 private static final String brokerList = "192.168.1.144:9092"; //主體名稱 private static final String topic = "dalianpai"; public static void main(String[] args) { Properties properties = new Properties(); //設(shè)置序列化器 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //設(shè)置重試次數(shù) properties.put(ProducerConfig.RETRIES_CONFIG,10); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList); // 2 構(gòu)建攔截鏈 List<String> interceptors = new ArrayList<>(); interceptors.add(CounterInterceptor.class.getName()); interceptors.add(TimeInterceptor.class.getName()); properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // 3 發(fā)送消息 for (int i = 0; i < 11; i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!"+i); producer.send(record); } producer.close(); } }
消息結(jié)果
到此這篇關(guān)于Java中的Kafka攔截器詳解的文章就介紹到這了,更多相關(guān)Kafka攔截器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java使用Jdbc連接Oracle執(zhí)行簡(jiǎn)單查詢操作示例
這篇文章主要介紹了Java使用Jdbc連接Oracle執(zhí)行簡(jiǎn)單查詢操作,結(jié)合實(shí)例形式詳細(xì)分析了java基于jdbc實(shí)現(xiàn)Oracle數(shù)據(jù)庫的連接與查詢相關(guān)操作技巧,需要的朋友可以參考下2019-09-09淺談Storm在zookeeper上的目錄結(jié)構(gòu)
這篇文章主要介紹了淺談Storm在zookeeper上的目錄結(jié)構(gòu)的相關(guān)內(nèi)容,涉及storm使用zookeeper的操作以及詳細(xì)結(jié)構(gòu)圖,具有一定參考價(jià)值,需要的朋友可以了解下。2017-10-10基于mybatis 動(dòng)態(tài)SQL查詢總結(jié)
這篇文章主要介紹了mybatis 動(dòng)態(tài)SQL查詢總結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07Windows10系統(tǒng)下JDK1.8環(huán)境變量的配置
今天帶大家學(xué)習(xí)在Windows10系統(tǒng)下怎么配置JDK1.8環(huán)境變量,文中有非常詳細(xì)的安裝及配置教程,對(duì)正在學(xué)習(xí)的小伙伴們很有幫助,需要的朋友可以參考下2021-05-05使用ResponseEntity作為的返回值的應(yīng)用
這篇文章主要介紹了使用ResponseEntity作為的返回值的應(yīng)用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-07-07關(guān)于log4j日志擴(kuò)展---自定義PatternLayout
這篇文章主要介紹了關(guān)于log4j日志擴(kuò)展---自定義PatternLayout,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12SpringBoot使用MockMvc進(jìn)行單元測(cè)試的實(shí)例代碼
在Spring Boot應(yīng)用程序中,使用MockMvc進(jìn)行單元測(cè)試是一種有效的方式,可以驗(yàn)證控制器的行為和API的正確性,在這篇博客中,我們將介紹如何使用MockMvc對(duì)用戶控制器進(jìn)行測(cè)試,感興趣的朋友可以參考下2024-01-01Springboot+Flowable?快速實(shí)現(xiàn)工作流的開發(fā)流程
這篇文章主要介紹了Springboot+Flowable?快速實(shí)現(xiàn)工作流的開發(fā)流程,本文通過實(shí)例代碼圖文相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-02-02