Kafka中使用Avro序列化和反序列化詳解
1.自定義序列化器與反序列化器
1.1 定義Order實體類
public class Order implements Serializable {
private Integer OrderId;
private String title;
public Order() {
}
public Order(Integer orderId, String title) {
OrderId = orderId;
this.title = title;
}
// 省略必要的get與set方法
}1.2 定義Order序列化類
由于Kafka中的數(shù)據(jù)都是字節(jié)數(shù)組,在將消息發(fā)送到Kafka之前需要先將數(shù)據(jù)序列化為字節(jié)數(shù)組。 序列化器的作用就是用于序列化要發(fā)送的消息的。
Kafka使用 org.apache.kafka.common.serialization.Serializer 接口用于定義序列化器,將 泛型指定類型的數(shù)據(jù)轉(zhuǎn)換為字節(jié)數(shù)組。
Kafka提供了如下常用類型的序列化類:

自定義序列化器需要實現(xiàn)org.apache.kafka.common.serialization.Serializer接口,并實現(xiàn)其中 的 serialize 方法。
public class OrderSerializer implements Serializer<Order> {
@Override
public byte[] serialize(String topic, Order data) {
try {
if (data == null)
return null;
Integer orderId = data.getOrderId();
String title = data.getTitle();
int length = 0;
byte[] bytes = null;
if (null != title) {
bytes = title.getBytes("utf-8");
length = bytes.length;
}
//前4個字節(jié)保存orderId,
//第二個4個字節(jié)保存title字段的長度
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
buffer.putInt(orderId);
buffer.putInt(length);
buffer.put(bytes);
return buffer.array();
} catch (UnsupportedEncodingException e) {
throw new SerializationException("序列化數(shù)據(jù)異常");
}
}
}1.3 生產(chǎn)者代碼
package com.warybee.c1;
import com.warybee.model.Order;
import com.warybee.serializer.OrderSerializer;
import org.apache.kafka.clients.producer.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class KafkaProducerDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
Map<String, Object> configs = new HashMap<>();
// 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
// 設(shè)置key的序列化類
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
// 設(shè)置自定義的序列化類
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, OrderSerializer.class);
configs.put(ProducerConfig.ACKS_CONFIG,"all");
KafkaProducer<Integer, Order> kafkaProducer=new KafkaProducer<Integer, Order>(configs);
//定義order
Order order=new Order();
order.setOrderId(1);
order.setTitle("iphone13 pro 256G");
ProducerRecord<Integer,Order> producerRecord=new ProducerRecord<Integer,Order>
( "test_order_topic",
0,
order.getOrderId(),
order);
//消息的異步確認(rèn)
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (exception==null){
System.out.println("消息的主題:"+recordMetadata.topic());
System.out.println("消息的分區(qū):"+recordMetadata.partition());
System.out.println("消息的偏移量:"+recordMetadata.offset());
}else {
System.out.println("發(fā)送消息異常");
}
}
});
// 關(guān)閉生產(chǎn)者
kafkaProducer.close();
}
}1.4. 定義Order反序列化器
自定義反序列化類,需要實現(xiàn) org.apache.kafka.common.serialization.Deserializer 接 口,并且實現(xiàn)其中的deserialize方法。
package com.warybee.deserializer;
import com.warybee.model.Order;
import org.apache.kafka.common.serialization.Deserializer;
import java.nio.ByteBuffer;
/**
* @author joy
* @description Order反序列化類
*/
public class OrderDeserializer implements Deserializer<Order> {
@Override
public Order deserialize(String topic, byte[] data) {
ByteBuffer allocate = ByteBuffer.allocate(data.length);
allocate.put(data);
allocate.flip();
Integer orderId = allocate.getInt();
int length = allocate.getInt();
String title = new String(data, 8, length);
return new Order(orderId, title);
}
}1.5 消費者代碼
package com.warybee.c1;
import com.warybee.deserializer.OrderDeserializer;
import com.warybee.model.Order;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author joy
*/
public class KafkaConsumerDemo {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
// 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址
// 如果是集群,則可以通過此初始連接發(fā)現(xiàn)集群中的其他broker
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
//KEY反序列化類
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
//自定義value反序列化類
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, OrderDeserializer.class);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo-3");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//創(chuàng)建消費者對象
KafkaConsumer<Integer, Order> consumer = new KafkaConsumer<Integer, Order>(configs);
List<String> topics = new ArrayList<>();
topics.add("test_order_topic");
//消費者訂閱主題
consumer.subscribe(topics);
while (true){
//批量拉取主題消息,每3秒拉取一次
ConsumerRecords<Integer, Order> records = consumer.poll(3000);
//變量消息
for (ConsumerRecord<Integer, Order> record : records) {
System.out.println(record.topic() + "\t"
+ record.partition() + "\t"
+ record.offset() + "\t"
+ record.key() + "\t"
+ record.value().getTitle());
}
}
}
}上面實現(xiàn)的序列化與反序列化類,定義繁瑣,不具有通用性,一不小心就會BUG滿天飛,不適合在實際項目中使用,只做了解原理即可。接下來使用Apache Avro來實現(xiàn)序列化和反序列化
2. 使用Avro序列化和反序列化
2.1 Apache Avro介紹
Apache Avro是一種與編程語言無關(guān)的序列化格式。提供了一種共享數(shù)據(jù)文件的方式。Avro 數(shù)據(jù)通過與語言無關(guān)的 schema 來定義。schema 通過 JSON 來描述,數(shù)據(jù)被序列化成二進(jìn)制文件或 JSON 文件,不過一般會使用二進(jìn)制文件。Avro 在讀寫文件時需要用到 schema,schema 一般會被內(nèi)嵌在數(shù)據(jù)文件里。
2.2 創(chuàng)建Maven項目
引入依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.7</version>
</dependency>
<!--Apache Avro 依賴-->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
</dependency>
Avro 插件
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.0</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/java/com/warybee/avro/schema/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
Avro插件參數(shù)說明:
- sourceDirectory:schema 文件所在目錄
- outputDirectory:根據(jù)schema 文件生成的類文件到哪個目錄
2.3 創(chuàng)建schema 文件
Apache Avro schema 是使用 JSON 定義的。詳細(xì)的介紹,可以參考官網(wǎng)文檔
定義一個order.avsc文件(文件所在目錄要與上一步配置的sourceDirectory保持一致),內(nèi)容如下:
{"namespace": "com.warybee.avro",
"type": "record",
"name": "Order",
"fields": [
{"name": "orderId", "type": "int"},
{"name": "title", "type": "string"},
{"name": "num", "type": "int"}
]
}
IDEA話,可以安裝一個Apache Avro IDL Schema Support插件,安裝插件后編寫schema 有智能提示。
2.4.Avro生成entity
上面配置了Avro 插件,通過maven命令,生成即可。
mvn install
或者IDEA右鍵->RUN Maven->install

2.5 生產(chǎn)者代碼
package com.warybee.avro;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.producer.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
/**
* @author joy
*/
public class KafkaProducerDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
Map<String, Object> configs = new HashMap<>();
// 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
// 設(shè)置key的序列化類
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
// 設(shè)置value的序列化類
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
configs.put(ProducerConfig.ACKS_CONFIG,"all");
KafkaProducer<Integer,byte[]> producer=new KafkaProducer<Integer, byte[]>(configs);
//發(fā)送100條消息
for (int i = 0; i < 100; i++) {
Order order=Order.newBuilder()
.setOrderId(i+1)
.setTitle("訂單: "+(i+1)+" iphone 13 pro 256G")
.setNum(1)
.build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, (BinaryEncoder)null);
SpecificDatumWriter writer = new SpecificDatumWriter(order.getSchema());
try {
writer.write(order, encoder);
encoder.flush();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
ProducerRecord<Integer,byte[]> record=new ProducerRecord<>(
"test_avro_topic",
0,
order.getOrderId(),
out.toByteArray());
//發(fā)送消息
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception==null){
System.out.println("消息的主題:"+metadata.topic());
System.out.println("消息的分區(qū):"+metadata.partition());
System.out.println("消息的偏移量:"+metadata.offset());
}else {
System.out.println("發(fā)送消息異常");
}
}
});
}
// 關(guān)閉生產(chǎn)者
producer.close();
}
}2.6 消費者代碼
package com.warybee.avro;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author joy
*/
public class KafkaConsumerDemo {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
// 設(shè)置連接Kafka的初始連接用到的服務(wù)器地址
// 如果是集群,則可以通過此初始連接發(fā)現(xiàn)集群中的其他broker
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
//KEY反序列化類
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
//value反序列化類
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
//消費者所在的組ID
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo.avro");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//創(chuàng)建消費者對象
KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<Integer, byte[]>(configs);
List<String> topics = new ArrayList<>();
topics.add("test_avro_topic");
//消費者訂閱主題
consumer.subscribe(topics);
SpecificDatumReader<Order> reader = new SpecificDatumReader<>(Order.getClassSchema());
try {
while (true){
//批量拉取主題消息,每3秒拉取一次
ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
for (ConsumerRecord<Integer, byte[]> record : records) {
Decoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
Order order=null;
try {
order=reader.read(null,decoder);
System.out.println("訂單ID:"+order.getOrderId()+"\t"
+"訂單標(biāo)題:"+order.getTitle()+"\t"
+"數(shù)量:"+order.getNum());
} catch (IOException e) {
e.printStackTrace();
}
}
}
}finally {
consumer.close();
}
}
}到此這篇關(guān)于Kafka中使用Avro序列化和反序列化詳解的文章就介紹到這了,更多相關(guān)Avro序列化和反序列化內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Boot實現(xiàn)qq郵箱驗證碼注冊和登錄驗證功能
這篇文章主要給大家介紹了關(guān)于Spring Boot實現(xiàn)qq郵箱驗證碼注冊和登錄驗證功能的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12
Spring?Boot使用線程池處理上萬條數(shù)據(jù)插入功能
這篇文章主要介紹了Spring?Boot使用線程池處理上萬條數(shù)據(jù)插入功能,使用步驟是先創(chuàng)建一個線程池的配置,讓Spring Boot加載,用來定義如何創(chuàng)建一個ThreadPoolTaskExecutor,本文通過實例代碼給大家介紹的非常詳細(xì),需要的朋友參考下吧2022-08-08
IDEA(2022.2)搭建Servlet基本框架超詳細(xì)步驟
這篇文章主要給大家介紹了關(guān)于IDEA(2022.2)搭建Servlet基本框架超詳細(xì)步驟,Servlet容器負(fù)責(zé)Servlet和客戶的通信以及調(diào)用Servlet的方法,Servlet和客戶的通信采用"請求/響應(yīng)"的模式,需要的朋友可以參考下2023-10-10
Java并發(fā)編程中的CyclicBarrier使用解析
這篇文章主要介紹了Java并發(fā)編程中的CyclicBarrier使用解析,CyclicBarrier從字面意思上來看,循環(huán)柵欄,這篇文章就來分析下是到底是如何實現(xiàn)循環(huán)和柵欄的,需要的朋友可以參考下2023-12-12
java 利用反射機制,獲取實體所有屬性和方法,并對屬性賦值
這篇文章主要介紹了 java 利用反射機制,獲取實體所有屬性和方法,并對屬性賦值的相關(guān)資料,需要的朋友可以參考下2017-01-01
Java?生成透明圖片的設(shè)置實現(xiàn)demo
這篇文章主要為大家介紹了Java?生成透明圖片的設(shè)置實現(xiàn)demo,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02

