Spring Boot集成Kafka的示例代碼
本文介紹了Spring Boot集成Kafka的示例代碼,分享給大家,也給自己留個筆記
系統(tǒng)環(huán)境
使用遠(yuǎn)程服務(wù)器上搭建的kafka服務(wù)
- Ubuntu 16.04 LTS
- kafka_2.12-0.11.0.0.tgz
- zookeeper-3.5.2-alpha.tar.gz
集成過程
1.創(chuàng)建spring boot工程,添加相關(guān)依賴:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.laravelshao.springboot</groupId> <artifactId>spring-boot-integration-kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring-boot-integration-kafka</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!--kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-json</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2.添加配置信息,這里使用yml文件
spring: kafka: bootstrap-servers:X.X.X.X:9092 producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer consumer: group-id: test auto-offset-reset: earliest value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring: json: trusted: packages: com.laravelshao.springboot.kafka
3.創(chuàng)建消息對象
public class Message { private Integer id; private String msg; public Message() { } public Message(Integer id, String msg) { this.id = id; this.msg = msg; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } @Override public String toString() { return "Message{" + "id=" + id + ", msg='" + msg + '\'' + '}'; } }
4.創(chuàng)建生產(chǎn)者
package com.laravelshao.springboot.kafka; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** * Created by shaoqinghua on 2018/3/23. */ @Component public class Producer { private static Logger log = LoggerFactory.getLogger(Producer.class); @Autowired private KafkaTemplate kafkaTemplate; public void send(String topic, Message message) { kafkaTemplate.send(topic, message); log.info("Producer->topic:{}, message:{}", topic, message); } }
5.創(chuàng)建消費(fèi)者,使用@ KafkaListener注解監(jiān)聽主題
package com.laravelshao.springboot.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * Created by shaoqinghua on 2018/3/23. */ @Component public class Consumer { private static Logger log = LoggerFactory.getLogger(Consumer.class); @KafkaListener(topics = "test_topic") public void receive(ConsumerRecord<String, Message> consumerRecord) { log.info("Consumer->topic:{}, value:{}", consumerRecord.topic(), consumerRecord.value()); } }
6.發(fā)送消費(fèi)測試
package com.laravelshao.springboot; import com.laravelshao.springboot.kafka.Message; import com.laravelshao.springboot.kafka.Producer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; @SpringBootApplication public class IntegrationKafkaApplication { public static void main(String[] args) throws InterruptedException { ApplicationContext context = SpringApplication.run(IntegrationKafkaApplication.class, args); Producer producer = context.getBean(Producer.class); for (int i = 1; i < 10; i++) { producer.send("test_topic", new Message(i, "test topic message " + i)); Thread.sleep(2000); } } }
可以依次看到發(fā)送消息,消費(fèi)消息
異常問題
反序列化異常(自定義的消息對象不在kafka信任的包路徑下)?
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.719 Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test_topic-0 at offset 9. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.laravelshao.springboot.kafka.Message' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113)
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:191)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:667)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:745)
解決方法:將當(dāng)前包添加到kafka信任的包路徑下
spring: kafka: consumer: properties: spring: json: trusted: packages: com.laravelshao.springboot.kafka
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Base64加解密的實(shí)現(xiàn)方式實(shí)例詳解
這篇文章主要介紹了Base64加解密的實(shí)現(xiàn)方式實(shí)例詳解的相關(guān)資料,這里提供了實(shí)現(xiàn)實(shí)例,幫助大家學(xué)習(xí)理解這部分內(nèi)容,需要的朋友可以參考下2017-08-08Java開發(fā)者結(jié)合Node.js編程入門教程
這篇文章主要介紹了Java開發(fā)者結(jié)合Node.js編程入門教程,我將先向您展示如何使用Java EE創(chuàng)建一個簡單的Rest服務(wù)來讀取 MongoDB數(shù)據(jù)庫。然后我會用node.js來實(shí)現(xiàn)相同的功能,需要的朋友可以參考下2014-09-09idea導(dǎo)入項目不顯示maven側(cè)邊欄的問題及解決方法
這篇文章主要介紹了idea導(dǎo)入項目不顯示maven側(cè)邊欄的問題及解決方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07java 文件目錄讀寫刪除操作詳細(xì)實(shí)現(xiàn)代碼
這篇文章主要介紹了java 文件讀寫刪操作詳細(xì)實(shí)現(xiàn)代碼,需要的朋友可以參考下2017-09-09SpringBoot中的yml文件中讀取自定義配置信息及遇到問題小結(jié)
這篇文章主要介紹了SpringBoot中的yml文件中讀取自定義配置信息,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-06-06