亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java中的Kafka攔截器詳解

 更新時(shí)間:2023年11月21日 10:04:48   作者:dalianpai  
這篇文章主要介紹了Java中的Kafka攔截器詳解,Producer?攔截器(interceptor)是在?Kafka?0.10?版本被引入的,主要用于實(shí)現(xiàn)?clients?端的定制化控制邏輯,需要的朋友可以參考下

Kafka攔截器

Producer 攔截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于實(shí)現(xiàn) clients 端的定制化控制邏輯。

對(duì)于 producer 而言, interceptor 使得用戶在消息發(fā)送前以及 producer 回調(diào)邏輯前有機(jī)會(huì)對(duì)消息做一些定制化需求,比如修改消息等。

同時(shí), producer 允許用戶指定多個(gè) interceptor按序作用于同一條消息從而形成一個(gè)攔截鏈(interceptor chain)。

Intercetpor 的實(shí)現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:

(1) configure(configs) 獲取配置信息和初始化數(shù)據(jù)時(shí)調(diào)用。

(2) onSend(ProducerRecord): 該方法封裝進(jìn) KafkaProducer.send 方法中,即它運(yùn)行在用戶主線程中。 Producer 確保在 消息被序列化以及計(jì)算分區(qū)前調(diào)用該方法。 用戶可以在該方法中對(duì)消息做任何操作,但最好保證不要修改消息所屬的 topic 和分區(qū), 否則會(huì)影響目標(biāo)分區(qū)的計(jì)算。

(3) onAcknowledgement(RecordMetadata, Exception): 該方法會(huì)在消息從 RecordAccumulator 成功發(fā)送到 Kafka Broker 之后,或者在發(fā)送過程中失敗時(shí)調(diào)用。 并且通常都是在 producer 回調(diào)邏輯觸發(fā)之前。 onAcknowledgement 運(yùn)行在producer 的 IO 線程中,因此不要在該方法中放入很重的邏輯,否則會(huì)拖慢 producer 的消息發(fā)送效率。

(4) close: 關(guān)閉 interceptor,主要用于執(zhí)行一些資源清理工作如前所述, interceptor 可能被運(yùn)行在多個(gè)線程中,因此在具體實(shí)現(xiàn)時(shí)用戶需要自行確保線程安全。另外倘若指定了多個(gè)interceptor,則 producer 將按照指定順序調(diào)用它們,并僅僅是捕獲每個(gè) interceptor 可能拋出的異常記錄到錯(cuò)誤日志中而非在向上傳遞。這在使用過程中要特別留意。

代碼演示

/**
 * @author WGR
 * @create 2021/8/9 -- 20:10
 */
public class CounterInterceptor implements ProducerInterceptor<String, String> {
    private int errorCounter = 0;
    private int successCounter = 0;
 
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }
 
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // 統(tǒng)計(jì)成功和失敗的次數(shù)
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }
 
    @Override
    public void close() {
        // 保存結(jié)果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }
 
    @Override
    public void configure(Map<String, ?> configs) {
 
    }
}
 
/**
 * @author WGR
 * @create 2021/8/9 -- 20:07
 */
public class TimeInterceptor implements ProducerInterceptor<String, String> {
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        // 創(chuàng)建一個(gè)新的 record,把時(shí)間戳寫入消息體的最前部
        return new ProducerRecord(record.topic(),
                record.partition(), record.timestamp(), record.key(),
                System.currentTimeMillis() + "," + record.value().toString());
    }
 
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
 
    }
 
    @Override
    public void close() {
 
    }
 
    @Override
    public void configure(Map<String, ?> configs) {
 
    }
}

把上面的攔截器加到producer 主程序

/**
 * @author WGR
 * @create 2021/8/9 -- 16:38
 */
public class ProducerFastStart {
 
    //kafka集群地址
    private static final String brokerList = "192.168.1.144:9092";
    //主體名稱
    private static final String topic = "dalianpai";
 
    public static void main(String[] args) {
        Properties properties = new Properties();
 
        //設(shè)置序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //設(shè)置重試次數(shù)
        properties.put(ProducerConfig.RETRIES_CONFIG,10);
 
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
 
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        // 2 構(gòu)建攔截鏈
        List<String> interceptors = new ArrayList<>();
        interceptors.add(CounterInterceptor.class.getName());
        interceptors.add(TimeInterceptor.class.getName());
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
 
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        // 3 發(fā)送消息
        for (int i = 0; i < 11; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!"+i);
            producer.send(record);
        }
 
        producer.close();
    }
}

消息結(jié)果

image-20210809203643520

到此這篇關(guān)于Java中的Kafka攔截器詳解的文章就介紹到這了,更多相關(guān)Kafka攔截器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java使用Jdbc連接Oracle執(zhí)行簡(jiǎn)單查詢操作示例

    Java使用Jdbc連接Oracle執(zhí)行簡(jiǎn)單查詢操作示例

    這篇文章主要介紹了Java使用Jdbc連接Oracle執(zhí)行簡(jiǎn)單查詢操作,結(jié)合實(shí)例形式詳細(xì)分析了java基于jdbc實(shí)現(xiàn)Oracle數(shù)據(jù)庫的連接與查詢相關(guān)操作技巧,需要的朋友可以參考下
    2019-09-09
  • Java處理壓縮文件的步驟詳解

    Java處理壓縮文件的步驟詳解

    在Java編程環(huán)境中,處理zip壓縮文件是一項(xiàng)常見的任務(wù),特別是在數(shù)據(jù)傳輸、備份或者打包應(yīng)用程序時(shí),本文將詳細(xì)講解Java處理壓縮文件的步驟,并有相關(guān)的代碼示例供大家參考,需要的朋友可以參考下
    2024-10-10
  • 淺談Storm在zookeeper上的目錄結(jié)構(gòu)

    淺談Storm在zookeeper上的目錄結(jié)構(gòu)

    這篇文章主要介紹了淺談Storm在zookeeper上的目錄結(jié)構(gòu)的相關(guān)內(nèi)容,涉及storm使用zookeeper的操作以及詳細(xì)結(jié)構(gòu)圖,具有一定參考價(jià)值,需要的朋友可以了解下。
    2017-10-10
  • 淺談Java中是否直接可以使用enum進(jìn)行傳輸

    淺談Java中是否直接可以使用enum進(jìn)行傳輸

    這篇文章主要介紹了淺談Java中是否直接可以使用enum進(jìn)行傳輸,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-05-05
  • 基于mybatis 動(dòng)態(tài)SQL查詢總結(jié)

    基于mybatis 動(dòng)態(tài)SQL查詢總結(jié)

    這篇文章主要介紹了mybatis 動(dòng)態(tài)SQL查詢總結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • Windows10系統(tǒng)下JDK1.8環(huán)境變量的配置

    Windows10系統(tǒng)下JDK1.8環(huán)境變量的配置

    今天帶大家學(xué)習(xí)在Windows10系統(tǒng)下怎么配置JDK1.8環(huán)境變量,文中有非常詳細(xì)的安裝及配置教程,對(duì)正在學(xué)習(xí)的小伙伴們很有幫助,需要的朋友可以參考下
    2021-05-05
  • 使用ResponseEntity作為的返回值的應(yīng)用

    使用ResponseEntity作為的返回值的應(yīng)用

    這篇文章主要介紹了使用ResponseEntity作為的返回值的應(yīng)用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-07-07
  • 關(guān)于log4j日志擴(kuò)展---自定義PatternLayout

    關(guān)于log4j日志擴(kuò)展---自定義PatternLayout

    這篇文章主要介紹了關(guān)于log4j日志擴(kuò)展---自定義PatternLayout,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • SpringBoot使用MockMvc進(jìn)行單元測(cè)試的實(shí)例代碼

    SpringBoot使用MockMvc進(jìn)行單元測(cè)試的實(shí)例代碼

    在Spring Boot應(yīng)用程序中,使用MockMvc進(jìn)行單元測(cè)試是一種有效的方式,可以驗(yàn)證控制器的行為和API的正確性,在這篇博客中,我們將介紹如何使用MockMvc對(duì)用戶控制器進(jìn)行測(cè)試,感興趣的朋友可以參考下
    2024-01-01
  • Springboot+Flowable?快速實(shí)現(xiàn)工作流的開發(fā)流程

    Springboot+Flowable?快速實(shí)現(xiàn)工作流的開發(fā)流程

    這篇文章主要介紹了Springboot+Flowable?快速實(shí)現(xiàn)工作流的開發(fā)流程,本文通過實(shí)例代碼圖文相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-02-02

最新評(píng)論