Kafka在客戶端實(shí)現(xiàn)消息的發(fā)送與讀取
1.創(chuàng)建Maven工程
引入kafka相關(guān)依賴,POM文件如下:
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build>
2.生產(chǎn)者API(Producer API)
2.1 生成者處理流程
- Producer創(chuàng)建時(shí),會(huì)創(chuàng)建一個(gè)Sender線程并設(shè)置為守護(hù)線程。
- 生產(chǎn)消息時(shí),內(nèi)部其實(shí)是異步流程;生產(chǎn)的消息先經(jīng)過(guò)攔截器->序列化器->**分區(qū)器,**然后將消 息緩存在緩沖區(qū)(該緩沖區(qū)也是在Producer創(chuàng)建時(shí)創(chuàng)建)。
- 批次發(fā)送的條件為:緩沖區(qū)數(shù)據(jù)大小達(dá)到batch.size或者linger.ms達(dá)到上限,哪個(gè)先達(dá)到就算 哪個(gè)。
- 批次發(fā)送后,發(fā)往指定分區(qū),然后落盤(pán)到broker;如果生產(chǎn)者配置了retrires參數(shù)大于0并且失 敗原因允許重試,那么客戶端內(nèi)部會(huì)對(duì)該消息進(jìn)行重試。
- 落盤(pán)到broker成功,返回生產(chǎn)元數(shù)據(jù)給生產(chǎn)者。
- 元數(shù)據(jù)返回有兩種方式:一種是通過(guò)阻塞直接返回,另一種是通過(guò)回調(diào)返回。
2.2 常用參數(shù)介紹
生產(chǎn)者主要的對(duì)象有: KafkaProducer , ProducerRecord 。
- KafkaProducer 是用于發(fā)送消息的類;
- ProducerRecord 類用于封裝Kafka的消息。
KafkaProducer 的實(shí)例化需要指定的參數(shù),Producer的參數(shù)定義在 org.apache.kafka.clients.producer.ProducerConfig類中。
常用參數(shù)說(shuō)明如下:
- bootstrap.servers: 配置生產(chǎn)者如何與broker建立連接。該參數(shù)設(shè)置的是初始化參數(shù)。如果生 產(chǎn)者需要連接的是Kafka集群,則這里配置集群中幾個(gè)broker的地址,而不 是全部,當(dāng)生產(chǎn)者連接上此處指定的broker之后,在通過(guò)該連接發(fā)現(xiàn)集群 中的其他節(jié)點(diǎn)。
- key.serializer: 要發(fā)送信息的key數(shù)據(jù)的序列化類。kafka-clients提供了常用類型的序列化類,序列化類都實(shí)現(xiàn)了org.apache.kafka.common.serialization.Serializer 接口。
- **value.serializer:**要發(fā)送消息的alue數(shù)據(jù)的序列化類。kafka-clients提供了常用類型的序列化類,序列化類都實(shí)現(xiàn)了org.apache.kafka.common.serialization.Serializer 接口。
acks: 默認(rèn)值:all。
acks=0: 生產(chǎn)者不等待broker對(duì)消息的確認(rèn),只要將消息放到緩沖區(qū),就認(rèn)為消息 已經(jīng)發(fā)送完成。 該情形不能保證broker是否真的收到了消息,retries配置也不會(huì)生效。發(fā) 送的消息的返回的消息偏移量永遠(yuǎn)是-1
acks=1 表示消息只需要寫(xiě)到主分區(qū)即可,然后就響應(yīng)客戶端,而不等待副本分區(qū)的 確認(rèn)。 在該情形下,如果主分區(qū)收到消息確認(rèn)之后就宕機(jī)了,而副本分區(qū)還沒(méi)來(lái)得 及同步該消息,則該消息丟失。
acks=all 首領(lǐng)分區(qū)會(huì)等待所有的ISR副本分區(qū)確認(rèn)記錄。 該處理保證了只要有一個(gè)ISR副本分區(qū)存活,消息就不會(huì)丟失。 這是Kafka最強(qiáng)的可靠性保證,等效于 acks=-1
- retries: 設(shè)置該屬性為一個(gè)大于1的值,將在消息發(fā)送失敗的時(shí)候重新發(fā)送消 息。該重試與客戶端收到異常重新發(fā)送并無(wú)二至。允許重試但是不設(shè) 置參數(shù)max.in.flight.requests.per.connection為1,存在消息亂序 的可能,因?yàn)槿绻麅蓚€(gè)批次發(fā)送到同一個(gè)分區(qū),第一個(gè)失敗了重試, 第二個(gè)成功了,則第一個(gè)消息批在第二個(gè)消息批后。int類型的值,默 認(rèn):0,可選值:[0,…,2147483647
- compression.type: 生產(chǎn)者生成數(shù)據(jù)的壓縮格式。默認(rèn)是none(沒(méi)有壓縮)。允許的 值:none,gzip,snappy和lz4。壓縮是對(duì)整個(gè)消息批次來(lái)講 的。消息批的效率也影響壓縮的比例。消息批越大,壓縮效率越好。默認(rèn)是none。
2.3 生產(chǎn)者代碼
package com.warybee; import org.apache.kafka.clients.producer.*; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; /** * @author joy */ public class KafkaProducerDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { Map<String, Object> configs = new HashMap<>(); // 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址 configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092"); // 設(shè)置key的序列化類 configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer"); // 設(shè)置value的序列化類 configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configs.put(ProducerConfig.ACKS_CONFIG,"all"); KafkaProducer<Integer,String> kafkaProducer=new KafkaProducer<Integer, String>(configs); //發(fā)送100條消息 for (int i = 0; i < 100; i++) { ProducerRecord<Integer,String> producerRecord=new ProducerRecord<> ( "test_topic_1", 0, i, "test topic msg "+i); //消息的異步確認(rèn) kafkaProducer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) { if (exception==null){ System.out.println("消息的主題:"+recordMetadata.topic()); System.out.println("消息的分區(qū):"+recordMetadata.partition()); System.out.println("消息的偏移量:"+recordMetadata.offset()); }else { System.out.println("發(fā)送消息異常"); } } }); } // 關(guān)閉生產(chǎn)者 kafkaProducer.close(); } }
3.消費(fèi)者API(Consumer API)
3.1 常用參數(shù)介紹
KafkaConsumer 的實(shí)例化需要指定的參數(shù),Consumer的參數(shù)定義在 org.apache.kafka.clients.consumer.ConsumerConfig類中。
常用參數(shù)說(shuō)明如下:
- bootstrap.servers: 配置生產(chǎn)者如何與broker建立連接。該參數(shù)設(shè)置的是初始化參數(shù)。如果生 產(chǎn)者需要連接的是Kafka集群,則這里配置集群中幾個(gè)broker的地址,而不 是全部,當(dāng)生產(chǎn)者連接上此處指定的broker之后,在通過(guò)該連接發(fā)現(xiàn)集群 中的其他節(jié)點(diǎn)。
- key.deserializer: key數(shù)據(jù)的反序列化類。kafka-clients提供了常用類型的反序列化類,反序列化類都實(shí)現(xiàn)了org.apache.kafka.common.serialization.Deserializer 接口。
- value.deserializer: Value數(shù)據(jù)的反序列化類。kafka-clients提供了常用類型的反序列化類,反序列化類都實(shí)現(xiàn)了org.apache.kafka.common.serialization.Deserializer 接口。
- group.id: 消費(fèi)組ID,用于指定當(dāng)前消費(fèi)者屬于哪個(gè)消費(fèi)組。
- auto.offset.reset: 當(dāng)kafka中沒(méi)有偏移量或者當(dāng)前偏移量在服務(wù)器中不存在時(shí),kafka該如何處理?參數(shù)值如下:
- earliest: automatically reset the offset to the earliest offset(自動(dòng)重置偏移量到最早的偏移量)
- latest: automatically reset the offset to the latest offset(自動(dòng)重置偏移量為最新的)
- none: throw exception to the consumer if no previous offset is found for the consumer’s group(如果消費(fèi)組上一個(gè)偏移量不存在,向consumer 拋出異常)
- anything: throw exception to the consumer.(向consumer 拋出異常)
- client.id : 消費(fèi)消息的時(shí)候向服務(wù)器發(fā)送的id字符串。在ip/port基礎(chǔ)上 提供應(yīng)用的邏輯名稱,記錄在服務(wù)端的請(qǐng)求日志中,用于追蹤請(qǐng)求的源。
- enable.auto.commit : 如果設(shè)置為true,消費(fèi)者會(huì)自動(dòng)周期性地向服務(wù)器提交偏移量。
3.2 消費(fèi)者與消費(fèi)組概念介紹
每一個(gè)Consumer屬于一個(gè)特定的Consumer Group,消費(fèi)者可以通過(guò)指定group.id,來(lái)確定其所在消費(fèi)組。
group_id一般設(shè)置為應(yīng)用的邏輯名稱。比如多個(gè)訂單處理程序組成一個(gè)消費(fèi)組,可以設(shè)置group_id 為"order_process"。
消費(fèi)組均衡地給消費(fèi)者分配分區(qū),每個(gè)分區(qū)只由消費(fèi)組中一個(gè)消費(fèi)者消費(fèi)
一個(gè)擁有四個(gè)分區(qū)的主題,包含一個(gè)消費(fèi)者的消費(fèi)組。此時(shí),消費(fèi)組中的消費(fèi)者消費(fèi)主題中的所有分區(qū)。
如果在消費(fèi)組中添加一個(gè)消費(fèi)者2,則每個(gè)消費(fèi)者分別從兩個(gè)分區(qū)接收消息。
如果消費(fèi)組有四個(gè)消費(fèi)者,則每個(gè)消費(fèi)者可以分配到一個(gè)分區(qū)
如果向消費(fèi)組中添加更多的消費(fèi)者,超過(guò)主題分區(qū)數(shù)量,則有一部分消費(fèi)者就會(huì)閑置,不會(huì)接收任 何消息
向消費(fèi)組添加消費(fèi)者是橫向擴(kuò)展消費(fèi)能力的主要方式。
3.3 消費(fèi)者代碼
package com.warybee; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.*; /** * @author joy */ public class KafkaConsumerDemo { public static void main(String[] args) { Map<String, Object> configs = new HashMap<>(); // 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址 // 如果是集群,則可以通過(guò)此初始連接發(fā)現(xiàn)集群中的其他broker configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092"); //KEY反序列化類 configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); //value反序列化類 configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo"); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //創(chuàng)建消費(fèi)者對(duì)象 KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs); List<String> topics = new ArrayList<>(); topics.add("test_topic_1"); //消費(fèi)者訂閱主題 consumer.subscribe(topics); while (true){ //批量拉取主題消息,每3秒拉取一次 ConsumerRecords<Integer, String> records = consumer.poll(3000); //變量消息 for (ConsumerRecord<Integer, String> record : records) { System.out.println("主題:"+record.topic() + "\t" +"分區(qū):" + record.partition() + "\t" +"偏移量:" + + record.offset() + "\t" +"Key:"+ record.key() + "\t" +"Value:"+ record.value()); } } } }
依次運(yùn)行生產(chǎn)者和消費(fèi)者??刂婆_(tái)可以看到消費(fèi)者接收到的消息
4. 客戶端鏈接異常信息處理
如果運(yùn)行代碼過(guò)程中,java客戶端連接出現(xiàn)Connection refused: no further information錯(cuò)誤:
java.net.ConnectException: Connection refused: no further information ............................省略其他錯(cuò)誤信息...................
修改 ${KAFKA_HOME}/config/server.properties
# The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 listeners = PLAINTEXT://localhost:9092
將localhost修改為kafka所在服務(wù)器的IP地址即可。如果java程序和kafka在同一個(gè)服務(wù)器上,則不需要修改。
到此這篇關(guān)于Kafka在客戶端實(shí)現(xiàn)消息的發(fā)送與讀取的文章就介紹到這了,更多相關(guān)Kafka消息發(fā)送與讀取內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java線程池ThreadPoolExecutor類使用小結(jié)
這篇文章主要介紹了java線程池ThreadPoolExecutor類使用,本文主要對(duì)ThreadPoolExecutor的使用方法進(jìn)行一個(gè)詳細(xì)的概述,示例代碼介紹了ThreadPoolExecutor的構(gòu)造函數(shù)的相關(guān)知識(shí),感興趣的朋友一起看看吧2022-03-03使用SpringBoot簡(jiǎn)單實(shí)現(xiàn)無(wú)感知的刷新 Token功能
實(shí)現(xiàn)無(wú)感知的刷新 Token 是一種提升用戶體驗(yàn)的常用技術(shù),可以在用戶使用應(yīng)用時(shí)自動(dòng)更新 Token,無(wú)需用戶手動(dòng)干預(yù),這種技術(shù)在需要長(zhǎng)時(shí)間保持用戶登錄狀態(tài)的應(yīng)用中非常有用,以下是使用Spring Boot實(shí)現(xiàn)無(wú)感知刷新Token的一個(gè)場(chǎng)景案例和相應(yīng)的示例代碼2024-09-09Java實(shí)現(xiàn)一個(gè)簡(jiǎn)易版的多級(jí)菜單功能
這篇文章主要給大家介紹了關(guān)于Java如何實(shí)現(xiàn)一個(gè)簡(jiǎn)易版的多級(jí)菜單功能的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2022-01-01JAVA實(shí)現(xiàn)的簡(jiǎn)單萬(wàn)年歷代碼
這篇文章主要介紹了JAVA實(shí)現(xiàn)的簡(jiǎn)單萬(wàn)年歷代碼,涉及Java日期操作的相關(guān)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-10-10springboot通過(guò)spel結(jié)合aop實(shí)現(xiàn)動(dòng)態(tài)傳參的案例
SpEl 是Spring框架中的一個(gè)利器,Spring通過(guò)SpEl能在運(yùn)行時(shí)構(gòu)建復(fù)雜表達(dá)式、存取對(duì)象屬性、對(duì)象方法調(diào)用等,今天通過(guò)本文給大家介紹springboot?spel結(jié)合aop實(shí)現(xiàn)動(dòng)態(tài)傳參,需要的朋友可以參考下2022-07-07Java讀寫(xiě)Windows共享文件夾的方法實(shí)例
本篇文章主要介紹了Java讀寫(xiě)Windows共享文件夾的方法實(shí)例,具有一定的參考價(jià)值,有興趣的同學(xué)可以了解一下。2016-11-11