亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Kafka中的producer攔截器與consumer攔截器詳解

 更新時間:2023年12月11日 09:49:18   作者:warybee  
這篇文章主要介紹了Kafka中的producer攔截器與consumer攔截器詳解,Producer 的Interceptor使得用戶在消息發(fā)送前以及Producer回調(diào)邏輯前有機會對消息做 一些定制化需求,比如修改消息等,需要的朋友可以參考下

1. producer 攔截器(interceptor)

1.1 介紹

Producer 的Interceptor使得用戶在消息發(fā)送前以及Producer回調(diào)邏輯前有機會對消息做 一些定制化需求,比如修改消息等。Producer允許指定多個Interceptor按照指定順序作用于一條消 息從而形成一個攔截鏈(interceptor chain)。

自定義的攔截器(interceptor)需要實現(xiàn)org.apache.kafka.clients.producer.ProducerInterceptor接口,接口定義如下:

package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;

public interface ProducerInterceptor<K, V> extends Configurable {
    
    /**
      該方法封裝進KafkaProducer.send()方法中,方法會在消息發(fā)送之前被調(diào)用,用戶可以在該方法中對消息做任
       何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會影響目標(biāo)分區(qū)的計算
    */
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

    /**
     該方法會在消息成功提交或發(fā)送失敗之后被調(diào)用,
     KafkaProducer.send()異步發(fā)送有回調(diào)通知 callback, onAcknowledgement 的調(diào)用要早于 callback 的調(diào)用。
    */
    void onAcknowledgement(RecordMetadata metadata, Exception exception);
    /**
     關(guān)閉Interceptor,主要用于執(zhí)行一些資源清理工作。
    */
    void close();
}
  • onSend() : 方法封裝進KafkaProducer.send()方法中,方法會在消息發(fā)送之前被調(diào)用,用戶可以在該方法中對消息做任
  • 何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會影響目標(biāo)分區(qū)的計算;
  • onAcknowledgement(): 當(dāng)發(fā)送到服務(wù)器的記錄已被確認(rèn)時,或者當(dāng)發(fā)送記錄在發(fā)送到服務(wù)器之前失敗時調(diào)用此方法。KafkaProducer.send()異步發(fā)送有回調(diào)通知 callback, onAcknowledgement 的調(diào)用要早于 callback 的調(diào)用。
  • close(): 關(guān)閉Interceptor,主要用于執(zhí)行一些資源清理工作。

如果指定了多個Interceptor,則Producer將按照指定順序調(diào)用它們,如果interceptor出現(xiàn)異常Producer僅僅是捕獲每個 Interceptor拋出的異常記錄到錯誤日志中而非在向上傳遞。

1.2 案例

實現(xiàn)兩個攔截器(interceptor),組成攔截鏈。第一個攔截器在消息發(fā)送前,給消息添加header。第二個攔截器統(tǒng)計消息的發(fā)送成功數(shù)和失敗數(shù)。

PS:其實這兩個功能可以放在一個interceptor中,這里僅僅是為了演示多個interceptor。

定義攔截器

攔截器1:

public class MyInterceptor implements ProducerInterceptor<Integer,String> {
    /**
     * 消息發(fā)送之前調(diào)用
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        //消息的主題
        String topic = record.topic();
        Integer partition = record.partition();
        Integer key = record.key();
        String value = record.value();
        Headers headers = record.headers();
        //給header添加時間戳
        String stamp = System.currentTimeMillis()+"";
        headers.add("timestamp",stamp.getBytes(StandardCharsets.UTF_8));
        ProducerRecord<Integer,String> resultRecord=
                  new ProducerRecord<>(topic,partition,key,value,headers);
        return resultRecord;
    }
    /**
     * 當(dāng)發(fā)送到服務(wù)器的記錄已被確認(rèn)時,或者當(dāng)發(fā)送記錄在發(fā)送到服務(wù)器之前失敗時調(diào)用此方法
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

攔截器2:

public class MyInterceptor02 implements ProducerInterceptor<Integer,String> {
    private int errorNum=0;
    private int successNum=0;
    /**
     * 消息發(fā)送之前調(diào)用
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        return record;
    }
    /**
     * 當(dāng)發(fā)送到服務(wù)器的記錄已被確認(rèn)時,或者當(dāng)發(fā)送記錄在發(fā)送到服務(wù)器之前失敗時調(diào)用此方法
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
         if (exception==null){
             successNum++;
         }else {
            errorNum++;
         }
    }
    @Override
    public void close() {
        System.out.println("消息發(fā)送成功數(shù): " + successNum);
        System.out.println("消息發(fā)送失敗數(shù): " + errorNum);
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

生產(chǎn)者

configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"攔截器類全路徑");,多個攔截器使用,分割

public class KafkaProducerDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        Map<String, Object> configs = new HashMap<>();
        // 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        // 設(shè)置key的序列化類
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        // 設(shè)置value的序列化類
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configs.put(ProducerConfig.ACKS_CONFIG,"all");
        //添加攔截器
        configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyInterceptor,com.warybee.interceptor.MyInterceptor02");
        KafkaProducer<Integer,String> kafkaProducer=new KafkaProducer<Integer, String>(configs);
        //發(fā)送100條消息
        for (int i = 0; i < 100; i++) {
            ProducerRecord<Integer,String> producerRecord=new ProducerRecord<>
                    (       "test_topic_1",
                            0,
                            i,
                            "test topic msg "+i);
            //消息的異步確認(rèn)
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                    if (exception==null){
                        System.out.println("消息的主題:"+recordMetadata.topic());
                        System.out.println("消息的分區(qū):"+recordMetadata.partition());
                        System.out.println("消息的偏移量:"+recordMetadata.offset());
                    }else {
                        System.out.println("發(fā)送消息異常");
                    }
                }
            });
        }
        // 關(guān)閉生產(chǎn)者
        kafkaProducer.close();
    }
}

2 Consumer攔截器(interceptor)

2.1.介紹

消費者(Consumer)在拉取了分區(qū)消息之后,要首先經(jīng)過反序列化器對key和value進行反序列化處理,處理完之后,如果消費端設(shè)置了攔截器,則需要經(jīng)過攔截器的處理之后,才能返回給消費者應(yīng)用程 序進行處理。

  • ConsumerInterceptor允許攔截甚至更改消費者接收到的消息。
  • 常用在于將第三方組件引入 消費者應(yīng)用程序,用于定制的監(jiān)控、日志處理等。
  • ConsumerInterceptor方法拋出的異常會被捕獲、記錄,但是不會向下傳播。如果用戶配置了 錯誤的key或value類型參數(shù),消費者不會拋出異常,而僅僅是記錄下來。
  • 如果有多個攔截器,則該方法按照KafkaConsumer的configs中配置的順序調(diào)用。
  • 從調(diào)用 KafkaConsumer.poll(long) 的同一線程調(diào)用 ConsumerInterceptor 回調(diào)。

自定義的攔截器(interceptor)需要實現(xiàn)org.apache.kafka.clients.consumer.ConsumerInterceptor接口,接口定義如下:

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
    /**
        該方法在poll方法返回之前調(diào)用。調(diào)用結(jié)束后poll方法就返回消息了。
        該方法可以修改消費者消息,返回新的消息。攔截器可以過濾收到的消息或生成新的消息。
     */
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
    /**
      當(dāng)消費者提交偏移量時,調(diào)用該方法。通常你可以在該方法中做一些記賬類的動作,比如打日志等。
      調(diào)用者將忽略此方法拋出的任何異常。
     */
    void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
    /**
     *  關(guān)閉Interceptor之前調(diào)用
     */
    void close();
}

方法說明:

  • onConsume 該方法在poll方法返回之前調(diào)用。調(diào)用結(jié)束后poll方法就返回消息了。
  • onCommit 當(dāng)消費者提交偏移量時,調(diào)用該方法。通常你可以在該方法中做一些記賬類的動作,比如打日志等。

2.2 案例

定義攔截器

public class MyConsumerInterceptor implements ConsumerInterceptor<Integer,String> {
    @Override
    public ConsumerRecords<Integer, String> onConsume(ConsumerRecords<Integer, String> records) {
        //在這里可以對接收到的消息進行修改
        //如不做處理,直接返回即可
        return records;
    }
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        offsets.forEach((tp,offsetAndMetadata) -> {
            System.out.println(tp+" : "+offsetAndMetadata.offset());
        });
    }
    @Override
    public void close() {
    }
    /**
     * 用于獲取消費者的設(shè)置參數(shù)
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {
        configs.forEach((k, v) -> {
            System.out.println(k + "\t" + v);
        });
    }
}

消費者

在消費者客戶端配置中增加如下配置

如果有多個攔截器,用,分割即可

configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");
public class KafkaConsumerDemo {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        // 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址
        // 如果是集群,則可以通過此初始連接發(fā)現(xiàn)集群中的其他broker
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        //KEY反序列化類
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        //value反序列化類
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");
        //創(chuàng)建消費者對象
        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);
        List<String> topics = new ArrayList<>();
        topics.add("test_topic_1");
        //消費者訂閱主題
        consumer.subscribe(topics);
        while (true){
            //批量拉取主題消息,每3秒拉取一次
            ConsumerRecords<Integer, String> records = consumer.poll(3000);
            //變量消息
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println(record.topic() + "\t"
                        + record.partition() + "\t"
                        + record.offset() + "\t"
                        + record.key() + "\t"
                        + record.value());
            }
        }
    }
}

到此這篇關(guān)于Kafka中的producer攔截器與consumer攔截器詳解的文章就介紹到這了,更多相關(guān)producer攔截器與consumer攔截器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論