亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Kafka在客戶端實(shí)現(xiàn)消息的發(fā)送與讀取

 更新時(shí)間:2023年12月11日 10:10:42   作者:warybee  
這篇文章主要介紹了Kafka在客戶端實(shí)現(xiàn)消息的發(fā)送與讀取,KafkaProducer是用于發(fā)送消息的類,ProducerRecord類用于封裝Kafka的消息,KafkaProducer的實(shí)例化需要指定的參數(shù),Producer的參數(shù)定義在 org.apache.kafka.clients.producer.ProducerConfig類中,需要的朋友可以參考下

1.創(chuàng)建Maven工程

引入kafka相關(guān)依賴,POM文件如下:

 <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.7</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

2.生產(chǎn)者API(Producer API)

2.1 生成者處理流程

在這里插入圖片描述

  1. Producer創(chuàng)建時(shí),會(huì)創(chuàng)建一個(gè)Sender線程并設(shè)置為守護(hù)線程。
  2. 生產(chǎn)消息時(shí),內(nèi)部其實(shí)是異步流程;生產(chǎn)的消息先經(jīng)過(guò)攔截器->序列化器->**分區(qū)器,**然后將消 息緩存在緩沖區(qū)(該緩沖區(qū)也是在Producer創(chuàng)建時(shí)創(chuàng)建)。
  3. 批次發(fā)送的條件為:緩沖區(qū)數(shù)據(jù)大小達(dá)到batch.size或者linger.ms達(dá)到上限,哪個(gè)先達(dá)到就算 哪個(gè)。
  4. 批次發(fā)送后,發(fā)往指定分區(qū),然后落盤(pán)到broker;如果生產(chǎn)者配置了retrires參數(shù)大于0并且失 敗原因允許重試,那么客戶端內(nèi)部會(huì)對(duì)該消息進(jìn)行重試。
  5. 落盤(pán)到broker成功,返回生產(chǎn)元數(shù)據(jù)給生產(chǎn)者。
  6. 元數(shù)據(jù)返回有兩種方式:一種是通過(guò)阻塞直接返回,另一種是通過(guò)回調(diào)返回。

2.2 常用參數(shù)介紹

生產(chǎn)者主要的對(duì)象有: KafkaProducer , ProducerRecord 。

  • KafkaProducer 是用于發(fā)送消息的類;
  • ProducerRecord 類用于封裝Kafka的消息。

KafkaProducer 的實(shí)例化需要指定的參數(shù),Producer的參數(shù)定義在 org.apache.kafka.clients.producer.ProducerConfig類中。

常用參數(shù)說(shuō)明如下:

  • bootstrap.servers: 配置生產(chǎn)者如何與broker建立連接。該參數(shù)設(shè)置的是初始化參數(shù)。如果生 產(chǎn)者需要連接的是Kafka集群,則這里配置集群中幾個(gè)broker的地址,而不 是全部,當(dāng)生產(chǎn)者連接上此處指定的broker之后,在通過(guò)該連接發(fā)現(xiàn)集群 中的其他節(jié)點(diǎn)。
  • key.serializer: 要發(fā)送信息的key數(shù)據(jù)的序列化類。kafka-clients提供了常用類型的序列化類,序列化類都實(shí)現(xiàn)了org.apache.kafka.common.serialization.Serializer 接口。

在這里插入圖片描述

  • **value.serializer:**要發(fā)送消息的alue數(shù)據(jù)的序列化類。kafka-clients提供了常用類型的序列化類,序列化類都實(shí)現(xiàn)了org.apache.kafka.common.serialization.Serializer 接口。

acks: 默認(rèn)值:all。

acks=0: 生產(chǎn)者不等待broker對(duì)消息的確認(rèn),只要將消息放到緩沖區(qū),就認(rèn)為消息 已經(jīng)發(fā)送完成。 該情形不能保證broker是否真的收到了消息,retries配置也不會(huì)生效。發(fā) 送的消息的返回的消息偏移量永遠(yuǎn)是-1

acks=1 表示消息只需要寫(xiě)到主分區(qū)即可,然后就響應(yīng)客戶端,而不等待副本分區(qū)的 確認(rèn)。 在該情形下,如果主分區(qū)收到消息確認(rèn)之后就宕機(jī)了,而副本分區(qū)還沒(méi)來(lái)得 及同步該消息,則該消息丟失。

acks=all 首領(lǐng)分區(qū)會(huì)等待所有的ISR副本分區(qū)確認(rèn)記錄。 該處理保證了只要有一個(gè)ISR副本分區(qū)存活,消息就不會(huì)丟失。 這是Kafka最強(qiáng)的可靠性保證,等效于 acks=-1

  • retries: 設(shè)置該屬性為一個(gè)大于1的值,將在消息發(fā)送失敗的時(shí)候重新發(fā)送消 息。該重試與客戶端收到異常重新發(fā)送并無(wú)二至。允許重試但是不設(shè) 置參數(shù)max.in.flight.requests.per.connection為1,存在消息亂序 的可能,因?yàn)槿绻麅蓚€(gè)批次發(fā)送到同一個(gè)分區(qū),第一個(gè)失敗了重試, 第二個(gè)成功了,則第一個(gè)消息批在第二個(gè)消息批后。int類型的值,默 認(rèn):0,可選值:[0,…,2147483647
  • compression.type: 生產(chǎn)者生成數(shù)據(jù)的壓縮格式。默認(rèn)是none(沒(méi)有壓縮)。允許的 值:none,gzip,snappy和lz4。壓縮是對(duì)整個(gè)消息批次來(lái)講 的。消息批的效率也影響壓縮的比例。消息批越大,壓縮效率越好。默認(rèn)是none。

2.3 生產(chǎn)者代碼

package com.warybee;
import org.apache.kafka.clients.producer.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
/**
 * @author joy
 */
public class KafkaProducerDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        Map<String, Object> configs = new HashMap<>();
        // 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        // 設(shè)置key的序列化類
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        // 設(shè)置value的序列化類
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configs.put(ProducerConfig.ACKS_CONFIG,"all");
        KafkaProducer<Integer,String> kafkaProducer=new KafkaProducer<Integer, String>(configs);
        //發(fā)送100條消息
        for (int i = 0; i < 100; i++) {
            ProducerRecord<Integer,String> producerRecord=new ProducerRecord<>
                    (       "test_topic_1",
                            0,
                            i,
                            "test topic msg "+i);
            //消息的異步確認(rèn)
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                    if (exception==null){
                        System.out.println("消息的主題:"+recordMetadata.topic());
                        System.out.println("消息的分區(qū):"+recordMetadata.partition());
                        System.out.println("消息的偏移量:"+recordMetadata.offset());
                    }else {
                        System.out.println("發(fā)送消息異常");
                    }
                }
            });
        }
        // 關(guān)閉生產(chǎn)者
        kafkaProducer.close();
    }
}

3.消費(fèi)者API(Consumer API)

3.1 常用參數(shù)介紹

KafkaConsumer 的實(shí)例化需要指定的參數(shù),Consumer的參數(shù)定義在 org.apache.kafka.clients.consumer.ConsumerConfig類中。

常用參數(shù)說(shuō)明如下:

  • bootstrap.servers: 配置生產(chǎn)者如何與broker建立連接。該參數(shù)設(shè)置的是初始化參數(shù)。如果生 產(chǎn)者需要連接的是Kafka集群,則這里配置集群中幾個(gè)broker的地址,而不 是全部,當(dāng)生產(chǎn)者連接上此處指定的broker之后,在通過(guò)該連接發(fā)現(xiàn)集群 中的其他節(jié)點(diǎn)。
  • key.deserializer: key數(shù)據(jù)的反序列化類。kafka-clients提供了常用類型的反序列化類,反序列化類都實(shí)現(xiàn)了org.apache.kafka.common.serialization.Deserializer 接口。
  • value.deserializer: Value數(shù)據(jù)的反序列化類。kafka-clients提供了常用類型的反序列化類,反序列化類都實(shí)現(xiàn)了org.apache.kafka.common.serialization.Deserializer 接口。
  • group.id: 消費(fèi)組ID,用于指定當(dāng)前消費(fèi)者屬于哪個(gè)消費(fèi)組。
  • auto.offset.reset: 當(dāng)kafka中沒(méi)有偏移量或者當(dāng)前偏移量在服務(wù)器中不存在時(shí),kafka該如何處理?參數(shù)值如下:
  • earliest: automatically reset the offset to the earliest offset(自動(dòng)重置偏移量到最早的偏移量)
  • latest: automatically reset the offset to the latest offset(自動(dòng)重置偏移量為最新的)
  • none: throw exception to the consumer if no previous offset is found for the consumer’s group(如果消費(fèi)組上一個(gè)偏移量不存在,向consumer 拋出異常)
  • anything: throw exception to the consumer.(向consumer 拋出異常)
  • client.id : 消費(fèi)消息的時(shí)候向服務(wù)器發(fā)送的id字符串。在ip/port基礎(chǔ)上 提供應(yīng)用的邏輯名稱,記錄在服務(wù)端的請(qǐng)求日志中,用于追蹤請(qǐng)求的源。
  • enable.auto.commit : 如果設(shè)置為true,消費(fèi)者會(huì)自動(dòng)周期性地向服務(wù)器提交偏移量。

3.2 消費(fèi)者與消費(fèi)組概念介紹

每一個(gè)Consumer屬于一個(gè)特定的Consumer Group,消費(fèi)者可以通過(guò)指定group.id,來(lái)確定其所在消費(fèi)組。

group_id一般設(shè)置為應(yīng)用的邏輯名稱。比如多個(gè)訂單處理程序組成一個(gè)消費(fèi)組,可以設(shè)置group_id 為"order_process"。

消費(fèi)組均衡地給消費(fèi)者分配分區(qū),每個(gè)分區(qū)只由消費(fèi)組中一個(gè)消費(fèi)者消費(fèi)

一個(gè)擁有四個(gè)分區(qū)的主題,包含一個(gè)消費(fèi)者的消費(fèi)組。此時(shí),消費(fèi)組中的消費(fèi)者消費(fèi)主題中的所有分區(qū)。

在這里插入圖片描述

如果在消費(fèi)組中添加一個(gè)消費(fèi)者2,則每個(gè)消費(fèi)者分別從兩個(gè)分區(qū)接收消息。

在這里插入圖片描述

如果消費(fèi)組有四個(gè)消費(fèi)者,則每個(gè)消費(fèi)者可以分配到一個(gè)分區(qū)

在這里插入圖片描述

如果向消費(fèi)組中添加更多的消費(fèi)者,超過(guò)主題分區(qū)數(shù)量,則有一部分消費(fèi)者就會(huì)閑置,不會(huì)接收任 何消息

在這里插入圖片描述

向消費(fèi)組添加消費(fèi)者是橫向擴(kuò)展消費(fèi)能力的主要方式。

3.3 消費(fèi)者代碼

package com.warybee;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
/**
 * @author joy
 */
public class KafkaConsumerDemo {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        // 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址
        // 如果是集群,則可以通過(guò)此初始連接發(fā)現(xiàn)集群中的其他broker
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        //KEY反序列化類
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        //value反序列化類
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //創(chuàng)建消費(fèi)者對(duì)象
        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);
        List<String> topics = new ArrayList<>();
        topics.add("test_topic_1");
        //消費(fèi)者訂閱主題
        consumer.subscribe(topics);
        while (true){
            //批量拉取主題消息,每3秒拉取一次
            ConsumerRecords<Integer, String> records = consumer.poll(3000);
            //變量消息
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("主題:"+record.topic() + "\t"
                       +"分區(qū):" + record.partition() + "\t"
                        +"偏移量:" +  + record.offset() + "\t"
                        +"Key:"+ record.key() + "\t"
                        +"Value:"+ record.value());
            }
        }
    }
}

依次運(yùn)行生產(chǎn)者和消費(fèi)者??刂婆_(tái)可以看到消費(fèi)者接收到的消息

4. 客戶端鏈接異常信息處理

如果運(yùn)行代碼過(guò)程中,java客戶端連接出現(xiàn)Connection refused: no further information錯(cuò)誤:

java.net.ConnectException: Connection refused: no further information
   ............................省略其他錯(cuò)誤信息...................

修改 ${KAFKA_HOME}/config/server.properties

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners = PLAINTEXT://localhost:9092

將localhost修改為kafka所在服務(wù)器的IP地址即可。如果java程序和kafka在同一個(gè)服務(wù)器上,則不需要修改。

到此這篇關(guān)于Kafka在客戶端實(shí)現(xiàn)消息的發(fā)送與讀取的文章就介紹到這了,更多相關(guān)Kafka消息發(fā)送與讀取內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java線程池ThreadPoolExecutor類使用小結(jié)

    java線程池ThreadPoolExecutor類使用小結(jié)

    這篇文章主要介紹了java線程池ThreadPoolExecutor類使用,本文主要對(duì)ThreadPoolExecutor的使用方法進(jìn)行一個(gè)詳細(xì)的概述,示例代碼介紹了ThreadPoolExecutor的構(gòu)造函數(shù)的相關(guān)知識(shí),感興趣的朋友一起看看吧
    2022-03-03
  • java中transient關(guān)鍵字分析

    java中transient關(guān)鍵字分析

    這篇文章主要介紹了java中transient關(guān)鍵字分析,transient與類對(duì)象的序列化息息相關(guān),序列化保存的是 類對(duì)象 狀態(tài),被transient關(guān)鍵字修飾的成員變量,在類的實(shí)例化對(duì)象的序列化處理過(guò)程中會(huì)被忽略,變量不會(huì)貫穿對(duì)象的序列化和反序列化,需要的朋友可以參考下
    2023-09-09
  • 深入理解注解與自定義注解的一些概念

    深入理解注解與自定義注解的一些概念

    今天給大家?guī)?lái)的文章是注解的相關(guān)知識(shí),本文圍繞著注解與自定義注解的一些概念展開(kāi),文中詳細(xì)介紹了這些知識(shí),需要的朋友可以參考下
    2021-06-06
  • 使用SpringBoot簡(jiǎn)單實(shí)現(xiàn)無(wú)感知的刷新 Token功能

    使用SpringBoot簡(jiǎn)單實(shí)現(xiàn)無(wú)感知的刷新 Token功能

    實(shí)現(xiàn)無(wú)感知的刷新 Token 是一種提升用戶體驗(yàn)的常用技術(shù),可以在用戶使用應(yīng)用時(shí)自動(dòng)更新 Token,無(wú)需用戶手動(dòng)干預(yù),這種技術(shù)在需要長(zhǎng)時(shí)間保持用戶登錄狀態(tài)的應(yīng)用中非常有用,以下是使用Spring Boot實(shí)現(xiàn)無(wú)感知刷新Token的一個(gè)場(chǎng)景案例和相應(yīng)的示例代碼
    2024-09-09
  • Java實(shí)現(xiàn)一個(gè)簡(jiǎn)易版的多級(jí)菜單功能

    Java實(shí)現(xiàn)一個(gè)簡(jiǎn)易版的多級(jí)菜單功能

    這篇文章主要給大家介紹了關(guān)于Java如何實(shí)現(xiàn)一個(gè)簡(jiǎn)易版的多級(jí)菜單功能的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2022-01-01
  • Lambda表達(dá)式的使用及注意事項(xiàng)

    Lambda表達(dá)式的使用及注意事項(xiàng)

    這篇文章主要介紹了Lambda表達(dá)式的使用及注意事項(xiàng),主要圍繞?Lambda表達(dá)式的省略模式?Lambda表達(dá)式和匿名內(nèi)部類的區(qū)別的相關(guān)內(nèi)容展開(kāi)詳情,感興趣的小伙伴可以參考一下
    2022-06-06
  • JAVA實(shí)現(xiàn)的簡(jiǎn)單萬(wàn)年歷代碼

    JAVA實(shí)現(xiàn)的簡(jiǎn)單萬(wàn)年歷代碼

    這篇文章主要介紹了JAVA實(shí)現(xiàn)的簡(jiǎn)單萬(wàn)年歷代碼,涉及Java日期操作的相關(guān)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下
    2015-10-10
  • IDEA設(shè)置Tab選項(xiàng)卡快速的操作

    IDEA設(shè)置Tab選項(xiàng)卡快速的操作

    這篇文章主要介紹了IDEA設(shè)置Tab選項(xiàng)卡快速的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-02-02
  • springboot通過(guò)spel結(jié)合aop實(shí)現(xiàn)動(dòng)態(tài)傳參的案例

    springboot通過(guò)spel結(jié)合aop實(shí)現(xiàn)動(dòng)態(tài)傳參的案例

    SpEl 是Spring框架中的一個(gè)利器,Spring通過(guò)SpEl能在運(yùn)行時(shí)構(gòu)建復(fù)雜表達(dá)式、存取對(duì)象屬性、對(duì)象方法調(diào)用等,今天通過(guò)本文給大家介紹springboot?spel結(jié)合aop實(shí)現(xiàn)動(dòng)態(tài)傳參,需要的朋友可以參考下
    2022-07-07
  • Java讀寫(xiě)Windows共享文件夾的方法實(shí)例

    Java讀寫(xiě)Windows共享文件夾的方法實(shí)例

    本篇文章主要介紹了Java讀寫(xiě)Windows共享文件夾的方法實(shí)例,具有一定的參考價(jià)值,有興趣的同學(xué)可以了解一下。
    2016-11-11

最新評(píng)論