亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Spring Boot集成Kafka的示例代碼

 更新時間:2018年04月08日 14:59:37   作者:寒武沒有紀(jì)  
本篇文章主要介紹了Spring Boot集成Kafka的示例代碼,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

本文介紹了Spring Boot集成Kafka的示例代碼,分享給大家,也給自己留個筆記

系統(tǒng)環(huán)境

使用遠(yuǎn)程服務(wù)器上搭建的kafka服務(wù)

  1. Ubuntu 16.04 LTS
  2. kafka_2.12-0.11.0.0.tgz
  3. zookeeper-3.5.2-alpha.tar.gz

集成過程

1.創(chuàng)建spring boot工程,添加相關(guān)依賴:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.laravelshao.springboot</groupId>
  <artifactId>spring-boot-integration-kafka</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>spring-boot-integration-kafka</name>
  <description>Demo project for Spring Boot</description>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--kafka-->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-json</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>

2.添加配置信息,這里使用yml文件

spring:
 kafka:
  bootstrap-servers:X.X.X.X:9092
  producer:
   value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  consumer:
   group-id: test
   auto-offset-reset: earliest
   value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
   properties:
    spring:
     json:
      trusted:
       packages: com.laravelshao.springboot.kafka

3.創(chuàng)建消息對象

public class Message {
  private Integer id;
  private String msg;

  public Message() {
  }

  public Message(Integer id, String msg) {
    this.id = id;
    this.msg = msg;
  }

  public Integer getId() {
    return id;
  }

  public void setId(Integer id) {
    this.id = id;
  }

  public String getMsg() {
    return msg;
  }

  public void setMsg(String msg) {
    this.msg = msg;
  }

  @Override
  public String toString() {
    return "Message{" +
        "id=" + id +
        ", msg='" + msg + '\'' +
        '}';
  }
}

4.創(chuàng)建生產(chǎn)者

package com.laravelshao.springboot.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * Created by shaoqinghua on 2018/3/23.
 */
@Component
public class Producer {
  private static Logger log = LoggerFactory.getLogger(Producer.class);

  @Autowired
  private KafkaTemplate kafkaTemplate;

  public void send(String topic, Message message) {
    kafkaTemplate.send(topic, message);
    log.info("Producer->topic:{}, message:{}", topic, message);
  }

}

5.創(chuàng)建消費(fèi)者,使用@ KafkaListener注解監(jiān)聽主題

package com.laravelshao.springboot.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * Created by shaoqinghua on 2018/3/23.
 */
@Component
public class Consumer {
  private static Logger log = LoggerFactory.getLogger(Consumer.class);

  @KafkaListener(topics = "test_topic")
  public void receive(ConsumerRecord<String, Message> consumerRecord) {
    log.info("Consumer->topic:{}, value:{}", consumerRecord.topic(), consumerRecord.value());
  }

}

6.發(fā)送消費(fèi)測試

package com.laravelshao.springboot;

import com.laravelshao.springboot.kafka.Message;
import com.laravelshao.springboot.kafka.Producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class IntegrationKafkaApplication {

  public static void main(String[] args) throws InterruptedException {
    ApplicationContext context = SpringApplication.run(IntegrationKafkaApplication.class, args);
    Producer producer = context.getBean(Producer.class);

    for (int i = 1; i < 10; i++) {
      producer.send("test_topic", new Message(i, "test topic message " + i));
      Thread.sleep(2000);
    }
  }

}

可以依次看到發(fā)送消息,消費(fèi)消息

異常問題

反序列化異常(自定義的消息對象不在kafka信任的包路徑下)?

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.719 Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test_topic-0 at offset 9. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.laravelshao.springboot.kafka.Message' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
 at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)
 at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113)
 at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:191)
 at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
 at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
 at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
 at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
 at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:667)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.lang.Thread.run(Thread.java:745)

解決方法:將當(dāng)前包添加到kafka信任的包路徑下

spring:
 kafka:
  consumer:
   properties:
    spring:
     json:
      trusted:
       packages: com.laravelshao.springboot.kafka

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • Base64加解密的實(shí)現(xiàn)方式實(shí)例詳解

    Base64加解密的實(shí)現(xiàn)方式實(shí)例詳解

    這篇文章主要介紹了Base64加解密的實(shí)現(xiàn)方式實(shí)例詳解的相關(guān)資料,這里提供了實(shí)現(xiàn)實(shí)例,幫助大家學(xué)習(xí)理解這部分內(nèi)容,需要的朋友可以參考下
    2017-08-08
  • Java開發(fā)者結(jié)合Node.js編程入門教程

    Java開發(fā)者結(jié)合Node.js編程入門教程

    這篇文章主要介紹了Java開發(fā)者結(jié)合Node.js編程入門教程,我將先向您展示如何使用Java EE創(chuàng)建一個簡單的Rest服務(wù)來讀取 MongoDB數(shù)據(jù)庫。然后我會用node.js來實(shí)現(xiàn)相同的功能,需要的朋友可以參考下
    2014-09-09
  • 一個簡易的Java多頁面隊列爬蟲程序

    一個簡易的Java多頁面隊列爬蟲程序

    這篇文章主要為大家詳細(xì)介紹了一個多頁面的java爬蟲,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2016-08-08
  • idea導(dǎo)入項目不顯示maven側(cè)邊欄的問題及解決方法

    idea導(dǎo)入項目不顯示maven側(cè)邊欄的問題及解決方法

    這篇文章主要介紹了idea導(dǎo)入項目不顯示maven側(cè)邊欄的問題及解決方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-07-07
  • java 文件目錄讀寫刪除操作詳細(xì)實(shí)現(xiàn)代碼

    java 文件目錄讀寫刪除操作詳細(xì)實(shí)現(xiàn)代碼

    這篇文章主要介紹了java 文件讀寫刪操作詳細(xì)實(shí)現(xiàn)代碼,需要的朋友可以參考下
    2017-09-09
  • Java Linkedlist原理及實(shí)例詳解

    Java Linkedlist原理及實(shí)例詳解

    這篇文章主要介紹了Java Linkedlist原理及實(shí)例詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-01-01
  • Java遞歸實(shí)現(xiàn)迷宮游戲

    Java遞歸實(shí)現(xiàn)迷宮游戲

    這篇文章主要介紹了如何利用Java遞歸方法實(shí)現(xiàn)迷宮游戲,下面文章會詳細(xì)的從為問題描述開始,清晰的解題思路以及詳細(xì)的代碼實(shí)現(xiàn),具有一定的參考價值,需要的小伙伴可以參考一下
    2021-12-12
  • SpringBoot中的yml文件中讀取自定義配置信息及遇到問題小結(jié)

    SpringBoot中的yml文件中讀取自定義配置信息及遇到問題小結(jié)

    這篇文章主要介紹了SpringBoot中的yml文件中讀取自定義配置信息,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-06-06
  • java實(shí)現(xiàn)登錄驗(yàn)證碼功能

    java實(shí)現(xiàn)登錄驗(yàn)證碼功能

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)登錄驗(yàn)證碼功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-10-10
  • 淺談JAVA并發(fā)之ReentrantLock

    淺談JAVA并發(fā)之ReentrantLock

    本文主要介紹了基于AQS實(shí)現(xiàn)的ReentrantLock(重入鎖),感興趣的同學(xué),可以參考下。
    2021-06-06

最新評論