Kafka中的producer攔截器與consumer攔截器詳解
1. producer 攔截器(interceptor)
1.1 介紹
Producer 的Interceptor使得用戶在消息發(fā)送前以及Producer回調(diào)邏輯前有機會對消息做 一些定制化需求,比如修改消息等。Producer允許指定多個Interceptor按照指定順序作用于一條消 息從而形成一個攔截鏈(interceptor chain)。
自定義的攔截器(interceptor)需要實現(xiàn)org.apache.kafka.clients.producer.ProducerInterceptor接口,接口定義如下:
package org.apache.kafka.clients.producer; import org.apache.kafka.common.Configurable; public interface ProducerInterceptor<K, V> extends Configurable { /** 該方法封裝進KafkaProducer.send()方法中,方法會在消息發(fā)送之前被調(diào)用,用戶可以在該方法中對消息做任 何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會影響目標(biāo)分區(qū)的計算 */ ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); /** 該方法會在消息成功提交或發(fā)送失敗之后被調(diào)用, KafkaProducer.send()異步發(fā)送有回調(diào)通知 callback, onAcknowledgement 的調(diào)用要早于 callback 的調(diào)用。 */ void onAcknowledgement(RecordMetadata metadata, Exception exception); /** 關(guān)閉Interceptor,主要用于執(zhí)行一些資源清理工作。 */ void close(); }
- onSend() : 方法封裝進KafkaProducer.send()方法中,方法會在消息發(fā)送之前被調(diào)用,用戶可以在該方法中對消息做任
- 何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會影響目標(biāo)分區(qū)的計算;
- onAcknowledgement(): 當(dāng)發(fā)送到服務(wù)器的記錄已被確認(rèn)時,或者當(dāng)發(fā)送記錄在發(fā)送到服務(wù)器之前失敗時調(diào)用此方法。KafkaProducer.send()異步發(fā)送有回調(diào)通知 callback, onAcknowledgement 的調(diào)用要早于 callback 的調(diào)用。
- close(): 關(guān)閉Interceptor,主要用于執(zhí)行一些資源清理工作。
如果指定了多個Interceptor,則Producer將按照指定順序調(diào)用它們,如果interceptor出現(xiàn)異常Producer僅僅是捕獲每個 Interceptor拋出的異常記錄到錯誤日志中而非在向上傳遞。
1.2 案例
實現(xiàn)兩個攔截器(interceptor),組成攔截鏈。第一個攔截器在消息發(fā)送前,給消息添加header。第二個攔截器統(tǒng)計消息的發(fā)送成功數(shù)和失敗數(shù)。
PS:其實這兩個功能可以放在一個interceptor中,這里僅僅是為了演示多個interceptor。
定義攔截器
攔截器1:
public class MyInterceptor implements ProducerInterceptor<Integer,String> { /** * 消息發(fā)送之前調(diào)用 * @param record * @return */ @Override public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) { //消息的主題 String topic = record.topic(); Integer partition = record.partition(); Integer key = record.key(); String value = record.value(); Headers headers = record.headers(); //給header添加時間戳 String stamp = System.currentTimeMillis()+""; headers.add("timestamp",stamp.getBytes(StandardCharsets.UTF_8)); ProducerRecord<Integer,String> resultRecord= new ProducerRecord<>(topic,partition,key,value,headers); return resultRecord; } /** * 當(dāng)發(fā)送到服務(wù)器的記錄已被確認(rèn)時,或者當(dāng)發(fā)送記錄在發(fā)送到服務(wù)器之前失敗時調(diào)用此方法 * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
攔截器2:
public class MyInterceptor02 implements ProducerInterceptor<Integer,String> { private int errorNum=0; private int successNum=0; /** * 消息發(fā)送之前調(diào)用 * @param record * @return */ @Override public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) { return record; } /** * 當(dāng)發(fā)送到服務(wù)器的記錄已被確認(rèn)時,或者當(dāng)發(fā)送記錄在發(fā)送到服務(wù)器之前失敗時調(diào)用此方法 * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (exception==null){ successNum++; }else { errorNum++; } } @Override public void close() { System.out.println("消息發(fā)送成功數(shù): " + successNum); System.out.println("消息發(fā)送失敗數(shù): " + errorNum); } @Override public void configure(Map<String, ?> configs) { } }
生產(chǎn)者
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"攔截器類全路徑");,多個攔截器使用,分割
public class KafkaProducerDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { Map<String, Object> configs = new HashMap<>(); // 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址 configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092"); // 設(shè)置key的序列化類 configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer"); // 設(shè)置value的序列化類 configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configs.put(ProducerConfig.ACKS_CONFIG,"all"); //添加攔截器 configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyInterceptor,com.warybee.interceptor.MyInterceptor02"); KafkaProducer<Integer,String> kafkaProducer=new KafkaProducer<Integer, String>(configs); //發(fā)送100條消息 for (int i = 0; i < 100; i++) { ProducerRecord<Integer,String> producerRecord=new ProducerRecord<> ( "test_topic_1", 0, i, "test topic msg "+i); //消息的異步確認(rèn) kafkaProducer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) { if (exception==null){ System.out.println("消息的主題:"+recordMetadata.topic()); System.out.println("消息的分區(qū):"+recordMetadata.partition()); System.out.println("消息的偏移量:"+recordMetadata.offset()); }else { System.out.println("發(fā)送消息異常"); } } }); } // 關(guān)閉生產(chǎn)者 kafkaProducer.close(); } }
2 Consumer攔截器(interceptor)
2.1.介紹
消費者(Consumer)在拉取了分區(qū)消息之后,要首先經(jīng)過反序列化器對key和value進行反序列化處理,處理完之后,如果消費端設(shè)置了攔截器,則需要經(jīng)過攔截器的處理之后,才能返回給消費者應(yīng)用程 序進行處理。
- ConsumerInterceptor允許攔截甚至更改消費者接收到的消息。
- 常用在于將第三方組件引入 消費者應(yīng)用程序,用于定制的監(jiān)控、日志處理等。
- ConsumerInterceptor方法拋出的異常會被捕獲、記錄,但是不會向下傳播。如果用戶配置了 錯誤的key或value類型參數(shù),消費者不會拋出異常,而僅僅是記錄下來。
- 如果有多個攔截器,則該方法按照KafkaConsumer的configs中配置的順序調(diào)用。
- 從調(diào)用 KafkaConsumer.poll(long) 的同一線程調(diào)用 ConsumerInterceptor 回調(diào)。
自定義的攔截器(interceptor)需要實現(xiàn)org.apache.kafka.clients.consumer.ConsumerInterceptor接口,接口定義如下:
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable { /** 該方法在poll方法返回之前調(diào)用。調(diào)用結(jié)束后poll方法就返回消息了。 該方法可以修改消費者消息,返回新的消息。攔截器可以過濾收到的消息或生成新的消息。 */ ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records); /** 當(dāng)消費者提交偏移量時,調(diào)用該方法。通常你可以在該方法中做一些記賬類的動作,比如打日志等。 調(diào)用者將忽略此方法拋出的任何異常。 */ void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets); /** * 關(guān)閉Interceptor之前調(diào)用 */ void close(); }
方法說明:
- onConsume 該方法在poll方法返回之前調(diào)用。調(diào)用結(jié)束后poll方法就返回消息了。
- onCommit 當(dāng)消費者提交偏移量時,調(diào)用該方法。通常你可以在該方法中做一些記賬類的動作,比如打日志等。
2.2 案例
定義攔截器
public class MyConsumerInterceptor implements ConsumerInterceptor<Integer,String> { @Override public ConsumerRecords<Integer, String> onConsume(ConsumerRecords<Integer, String> records) { //在這里可以對接收到的消息進行修改 //如不做處理,直接返回即可 return records; } @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { offsets.forEach((tp,offsetAndMetadata) -> { System.out.println(tp+" : "+offsetAndMetadata.offset()); }); } @Override public void close() { } /** * 用于獲取消費者的設(shè)置參數(shù) * @param configs */ @Override public void configure(Map<String, ?> configs) { configs.forEach((k, v) -> { System.out.println(k + "\t" + v); }); } }
消費者
在消費者客戶端配置中增加如下配置
如果有多個攔截器,用,分割即可
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");
public class KafkaConsumerDemo { public static void main(String[] args) { Map<String, Object> configs = new HashMap<>(); // 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址 // 如果是集群,則可以通過此初始連接發(fā)現(xiàn)集群中的其他broker configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092"); //KEY反序列化類 configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); //value反序列化類 configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo"); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor"); //創(chuàng)建消費者對象 KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs); List<String> topics = new ArrayList<>(); topics.add("test_topic_1"); //消費者訂閱主題 consumer.subscribe(topics); while (true){ //批量拉取主題消息,每3秒拉取一次 ConsumerRecords<Integer, String> records = consumer.poll(3000); //變量消息 for (ConsumerRecord<Integer, String> record : records) { System.out.println(record.topic() + "\t" + record.partition() + "\t" + record.offset() + "\t" + record.key() + "\t" + record.value()); } } } }
到此這篇關(guān)于Kafka中的producer攔截器與consumer攔截器詳解的文章就介紹到這了,更多相關(guān)producer攔截器與consumer攔截器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Cloud體系實現(xiàn)標(biāo)簽路由的方法示例
這篇文章主要介紹了Spring Cloud體系實現(xiàn)標(biāo)簽路由的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-05-05springboot中的Application.properties常用配置
這篇文章主要介紹了springboot中的Application.properties常用配置,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05SSM框架下如何實現(xiàn)數(shù)據(jù)從后臺傳輸?shù)角芭_
這篇文章主要介紹了SSM框架下如何實現(xiàn)數(shù)據(jù)從后臺傳輸?shù)角芭_,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-05-05java 使用ImageIO.writer從BufferedImage生成jpeg圖像遇到問題總結(jié)及解決
這篇文章主要介紹了java 使用ImageIO.writer從BufferedImage生成jpeg圖像遇到問題總結(jié)及解決的相關(guān)資料,需要的朋友可以參考下2017-03-03spring事務(wù)的REQUIRES_NEW源碼示例解析
這篇文章主要為大家介紹了spring事務(wù)的REQUIRES_NEW源碼示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-09-09