亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Java中Springboot集成Kafka實現(xiàn)消息發(fā)送和接收功能

 更新時間:2025年01月24日 14:38:54   作者:cccl.  
Kafka是一個高吞吐量的分布式發(fā)布-訂閱消息系統(tǒng),主要用于處理大規(guī)模數(shù)據(jù)流,它由生產(chǎn)者、消費者、主題、分區(qū)和代理等組件構(gòu)成,Kafka可以實現(xiàn)消息隊列、數(shù)據(jù)存儲和流處理等功能,在Java中,可以使用Spring Boot集成Kafka實現(xiàn)消息的發(fā)送和接收,感興趣的朋友跟隨小編一起看看吧

一、Kafka 簡介

Kafka 是由 Apache 軟件基金會開發(fā)的一個開源流處理平臺,最初由 LinkedIn 公司開發(fā),并于 2011 年開源。它是一種高吞吐量的分布式發(fā)布 - 訂閱消息系統(tǒng),以可持久化、高吞吐、低延遲、高容錯等特性而著稱。
Kafka 主要由生產(chǎn)者(Producer)、消費者(Consumer)、主題(Topic)、分區(qū)(Partition)和代理(Broker)等組件構(gòu)成。生產(chǎn)者負(fù)責(zé)將數(shù)據(jù)發(fā)送到 Kafka 集群,消費者從集群中讀取數(shù)據(jù)。主題是一種邏輯上的分類,數(shù)據(jù)被發(fā)送到特定的主題。每個主題又可以劃分為多個分區(qū),以實現(xiàn)數(shù)據(jù)的并行處理和提高系統(tǒng)的可擴展性。代理則是 Kafka 集群中的服務(wù)器節(jié)點,負(fù)責(zé)接收和存儲生產(chǎn)者發(fā)送的數(shù)據(jù),并為消費者提供數(shù)據(jù)讀取服務(wù)。

二、Kafka 功能

消息隊列功能:Kafka 可以作為消息隊列使用,在應(yīng)用程序之間傳遞消息。生產(chǎn)者將消息發(fā)送到主題,不同的消費者可以從主題中訂閱并消費消息,實現(xiàn)應(yīng)用程序解耦。例如,在電商系統(tǒng)中,訂單生成模塊可以將訂單消息發(fā)送到 Kafka 主題,后續(xù)的庫存管理、物流配送等模塊可以從該主題消費訂單消息,各自獨立處理,降低模塊間的耦合度。
數(shù)據(jù)存儲功能:Kafka 具有持久化存儲能力,它將消息數(shù)據(jù)存儲在磁盤上,并且通過多副本機制保證數(shù)據(jù)的可靠性。即使某個節(jié)點出現(xiàn)故障,數(shù)據(jù)也不會丟失。這種特性使得 Kafka 不僅可以作為消息隊列,還能用于數(shù)據(jù)的長期存儲和備份,例如用于存儲系統(tǒng)的操作日志,方便后續(xù)的數(shù)據(jù)分析和故障排查。
流處理功能:Kafka 可以與流處理框架(如 Apache Flink、Spark Streaming 等)集成,對實時數(shù)據(jù)流進行處理。通過將實時數(shù)據(jù)發(fā)送到 Kafka 主題,流處理框架可以從主題中讀取數(shù)據(jù)并進行實時計算、分析和轉(zhuǎn)換。例如,在實時監(jiān)控系統(tǒng)中,通過 Kafka 收集服務(wù)器的性能指標(biāo)數(shù)據(jù),然后使用流處理框架對這些數(shù)據(jù)進行實時分析,及時發(fā)現(xiàn)性能異常并發(fā)出警報。

三、POM依賴

    <!-- kafka-->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.8.11</version>
    </dependency>

四、配置文件

spring:
  # Kafka 配置
  kafka:
    # Kafka 服務(wù)器地址和端口 代理地址,可以多個
    bootstrap-servers: IP:9092
    # 生產(chǎn)者配置
    producer:
      # 發(fā)送失敗時的重試次數(shù)
      retries: 3
      # 每次批量發(fā)送消息的數(shù)量,調(diào)整為較小值
      batch-size: 1
      # 生產(chǎn)者緩沖區(qū)大小
      buffer-memory: 33554432
      # 消息 key 的序列化器,將 key 序列化為字節(jié)數(shù)組
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 消息 value 的序列化器,將消息體序列化為字節(jié)數(shù)組
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # 消費者配置
    consumer:
      # 當(dāng)沒有初始偏移量或當(dāng)前偏移量不存在時,從最早的消息開始消費
      auto-offset-reset: earliest
      # 是否自動提交偏移量
      enable-auto-commit: true
      # 自動提交偏移量的時間間隔(毫秒),延長自動提交時間間隔
      auto-commit-interval: 1000
      # 消息 key 的反序列化器,將字節(jié)數(shù)組反序列化為 key
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 消息 value 的反序列化器,將字節(jié)數(shù)組反序列化為消息體
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

五、生產(chǎn)者

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
 * 生產(chǎn)者
 *
 * @author chenlei
 */
@Slf4j
@Component
public class KafkaProducer {
    /**
     * KafkaTemplate
     */
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    /**
     * 發(fā)送消息到指定的 Kafka 主題,并可指定分組信息
     *
     * @param topic   消息要發(fā)送到的 Kafka 主題
     * @param message 要發(fā)送的消息內(nèi)容
     */
    public void sendMessage(String topic, String message) {
        // 使用 KafkaTemplate 發(fā)送消息,將消息發(fā)送到指定的主題
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                // 消息發(fā)送成功后的處理邏輯,可根據(jù)需要添加
                log.info("已發(fā)送消息=[" + message + "],其偏移量=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                // 消息發(fā)送失敗后的處理邏輯,使用日志記錄異常
                log.error("發(fā)送消息=[" + message + "] 失敗", ex);
            }
        });
    }
}

六、消費者

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
 * @author 消費者
 * chenlei
 */
@Slf4j
@Component
public class KafkaConsumer {
    /**
     * 監(jiān)聽 Kafka 主題方法。
     *
     * @param record 從 Kafka 接收到的 ConsumerRecord,包含消息的鍵值對
     */
    @KafkaListener(topics = {"topic"}, groupId = "consumer.group-id", concurrency = "5")
    public void listen(ConsumerRecord<?, ?> record) {
        // 打印接收到的消息的詳細(xì)信息
        log.info("接收到 Kafka 消息: 主題 = {}, 分區(qū) = {}, 偏移量 = {}, 鍵 = {}, 值 = {}",
                record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
}

到此這篇關(guān)于Java中Springboot集成Kafka實現(xiàn)消息發(fā)送和接收的文章就介紹到這了,更多相關(guān)Springboot Kafka 消息發(fā)送和接收內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • springboot?log4j2日志框架整合與使用過程解析

    springboot?log4j2日志框架整合與使用過程解析

    這篇文章主要介紹了springboot?log4j2日志框架整合與使用,包括引入maven依賴和添加配置文件log4j2-spring.xml的相關(guān)知識,需要的朋友可以參考下
    2022-05-05
  • Spring TaskScheduler使用實例解析

    Spring TaskScheduler使用實例解析

    這篇文章主要介紹了Spring TaskScheduler使用實例解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-11-11
  • 利用javaFX實現(xiàn)移動一個小球的示例代碼

    利用javaFX實現(xiàn)移動一個小球的示例代碼

    這篇文章主要介紹了利用javaFX實現(xiàn)移動一個小球的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-09-09
  • Java從數(shù)據(jù)庫中讀取Blob對象圖片并顯示的方法

    Java從數(shù)據(jù)庫中讀取Blob對象圖片并顯示的方法

    這篇文章主要介紹了Java從數(shù)據(jù)庫中讀取Blob對象圖片并顯示的方法,實例分析了Java讀取數(shù)據(jù)庫中Blob對象圖片的技巧與操作方法,需要的朋友可以參考下
    2015-02-02
  • 基于java實現(xiàn)websocket代碼示例

    基于java實現(xiàn)websocket代碼示例

    這篇文章主要介紹了基于java實現(xiàn)websocket代碼示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-12-12
  • 詳解Spring Cloud Gateway 數(shù)據(jù)庫存儲路由信息的擴展方案

    詳解Spring Cloud Gateway 數(shù)據(jù)庫存儲路由信息的擴展方案

    這篇文章主要介紹了詳解Spring Cloud Gateway 數(shù)據(jù)庫存儲路由信息的擴展方案,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-11-11
  • Java經(jīng)典用法總結(jié)

    Java經(jīng)典用法總結(jié)

    這篇文章主要介紹了Java經(jīng)典用法總結(jié),在本文中,盡量收集一些java最常用的習(xí)慣用法,特別是很難猜到的用法,感興趣的小伙伴們可以參考一下
    2016-02-02
  • java在cmd中亂碼的問題解決

    java在cmd中亂碼的問題解決

    本文深入探討了在使用Java命令行cmd時可能出現(xiàn)的中文亂碼問題,并提供了兩種解決方案,文中通過示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-03-03
  • SpringBoot接入ftp/ftps并上傳文件和配置的代碼指南

    SpringBoot接入ftp/ftps并上傳文件和配置的代碼指南

    接入ftp服務(wù)器,在springboot上實現(xiàn)起來也不算復(fù)雜,本文主要講下如何在springboot下接入ftp服務(wù)上傳文件,并對出現(xiàn)的問題做一些記錄,ftp服務(wù)的參數(shù)配置等,需要的朋友可以參考下
    2024-12-12
  • Spring?Batch批處理框架操作指南

    Spring?Batch批處理框架操作指南

    Spring?Batch?是?Spring?提供的一個數(shù)據(jù)處理框架。企業(yè)域中的許多應(yīng)用程序需要批量處理才能在關(guān)鍵任務(wù)環(huán)境中執(zhí)行業(yè)務(wù)操作,這篇文章主要介紹了Spring?Batch批處理框架操作指南,需要的朋友可以參考下
    2022-07-07

最新評論