亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Go庫實現(xiàn)Kafka消息的發(fā)送與接收(docker和k3s安裝kafka)

 更新時間:2025年09月27日 09:12:43   作者:福大大架構(gòu)師每日一題  
文章介紹使用docker在宿主機映射容器9092端口部署k3s,并使用Go庫實現(xiàn)Kafka消息的發(fā)送與接收,涉及segmentio/kafka-gogo、saramago和sarama等客戶端庫的應(yīng)用場景

kafka是什么

Kafka傳統(tǒng)定義 :Kafka是一個分布式的基于發(fā)布/訂閱模式的消息隊列(Message Queue),主要應(yīng)用于大數(shù)據(jù)實時處理領(lǐng)域。 發(fā)布/訂閱 :消息的發(fā)布者不會將消息直接發(fā)送給特定的訂閱者,而是將發(fā)布的消息 分為不同的類別,訂閱者只接收感興趣的消息。

Kafka最新定義:Kafka是一個開源的分布式事件流平臺(Event Streaming Platform),被數(shù)千家公司用于高性能數(shù)據(jù)管道、流分析、數(shù)據(jù)集成和關(guān)鍵任務(wù)應(yīng)用。

消息隊列的應(yīng)用場景無外乎是:削峰填谷、應(yīng)用解耦、異步處理等等,具體使用案例我們在之前講rabbitmq基礎(chǔ)篇已經(jīng)詳述過,這里不在做講述,這里說一下消息隊列的兩種模型:

  • 點對點模型 :也叫消息隊列模型。如果拿上面那個“民間版”的定義來說,那么系統(tǒng) A 發(fā)送的消息只能被系統(tǒng) B 接收,其他任何系統(tǒng)都不能讀取 A 發(fā)送的消息。日常生活的例子比如電話客服就屬于這種模型:同一個客戶呼入電話只能被一位客服人員處理,第二個客服人員不能為該客戶服務(wù)。
  • 發(fā)布 / 訂閱模型 :與上面不同的是,它有一個主題(Topic)的概念,你可以理解成邏輯語義相近的消息容器。該模型也有發(fā)送方和接收方,只不過提法不同。發(fā)送方也稱為發(fā)布者(Publisher),接收方稱為訂閱者(Subscriber)。和點對點模型不同的是,這個模型可能存在多個發(fā)布者向相同的主題發(fā)送消息,而訂閱者也可能存在多個,它們都能接收到相同主題的消息。生活中的報紙訂閱就是一種典型的發(fā)布 / 訂閱模型。

kafka基礎(chǔ)架構(gòu)和核心概念

在 Kafka 中,發(fā)布訂閱的對象是 主題(Topic ),你可以為每個業(yè)務(wù)、每個應(yīng)用甚至是每類數(shù)據(jù)都創(chuàng)建專屬的主題。

生產(chǎn)者(Producer) :消息生產(chǎn)者,就是向 kafka broker 發(fā)消息的客戶端,生產(chǎn)者程序通常持續(xù)不斷地向一個或多個主題發(fā)送消息。

消費者(Consumer) :消息消費者,向 kafka broker 取消息的客戶端,消費者就是訂閱這些主題消息的客戶端應(yīng)用程序。

和生產(chǎn)者類似,消費者也能夠同時訂閱多個主題的消息。我們把生產(chǎn)者和消費者統(tǒng)稱為客戶端(Clients)。你可以同時運行多個生產(chǎn)者和消費者實例,這些實例會不斷地向 Kafka 集群中的多個主題生產(chǎn)和消費消息。

消費者組Consumer Group** (CG)**:由多個 consumer 組成。消費者組內(nèi)每個消費者負責(zé)消費不同分區(qū)的數(shù)據(jù),一個分區(qū)只能由一個組內(nèi)消費者消費,消費者組之間互不影響。所有的消費者都屬于某個消費者組,即消費者組是邏輯上的一個訂閱者。

Broker :一臺 kafka 服務(wù)器就是一個 broker。一個集群由多個 broker 組成。一個 broker 可以容納多個 topic。

主題(topic) :可以理解為一個隊列,生產(chǎn)者和消費者面向的都是一個 topic;

分區(qū)(Partition) :為了實現(xiàn)擴展性,一個非常大的 topic 可以分布到多個 broker(即服務(wù)器)上, 一個 topic 可以分為多個 partition,每個 partition 是一個有序的隊列;

副本(Replica) :副本,為保證集群中的某個節(jié)點發(fā)生故障時,該節(jié)點上的 partition 數(shù)據(jù)不丟失,且 kafka 仍然能夠繼續(xù)工作,kafka 提供了副本機制,一個 topic 的每個分區(qū)都有若干個副本, 一個 leader 和若干個 follower 。

leader :每個分區(qū)多個副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對象,以及消費者消費數(shù)據(jù)的對 象都是 leader。

follower :每個分區(qū)多個副本中的“從”,實時從 leader 中同步數(shù)據(jù),保持和 leader 數(shù)據(jù) 的同步。leader 發(fā)生故障時,某個 follower 會成為新的 follower

副本的工作機制也很簡單:生產(chǎn)者總是向領(lǐng)導(dǎo)者副本寫消息;而消費者總是從領(lǐng)導(dǎo)者副本讀消息。至于追隨者副本,它只做一件事:向領(lǐng)導(dǎo)者副本發(fā)送請求,請求領(lǐng)導(dǎo)者把最新生產(chǎn)的消息發(fā)給它,這樣它能保持與領(lǐng)導(dǎo)者的同步。

Kafka 使用消息日志(Log)來保存數(shù)據(jù),一個日志就是磁盤上一個只能追加寫(Append-only)消息的物理文件。因為只能追加寫入,故 避免了緩慢的隨機 I/O 操作,改為性能較好的順序 I/O 寫操作,這也是實現(xiàn) Kafka 高吞吐量特性的一個重要手段 。不過如果你不停地向一個日志寫入消息,最終也會耗盡所有的磁盤空間,因此 Kafka 必然要定期地刪除消息以回收磁盤。怎么刪除呢?簡單來說就是通過日志段(Log Segment)機制。在 Kafka 底層,一個日志又近一步細分成多個日志段,消息被追加寫到當(dāng)前最新的日志段中,當(dāng)寫滿了一個日志段后,Kafka 會自動切分出一個新的日志段,并將老的日志段封存起來。Kafka 在后臺還有定時任務(wù)會定期地檢查老的日志段是否能夠被刪除,從而實現(xiàn)回收磁盤空間的目的

Kafka優(yōu)缺點

優(yōu)點

高吞吐量:Kafka的順序日志機制和高可用性設(shè)計使其在高并發(fā)場景下表現(xiàn)出色。

擴展性強:通過分區(qū)和復(fù)制機制,Kafka能夠輕松擴展到多個節(jié)點。 ** easy to use**:Kafka提供了豐富的 API 和工具支持,簡化了集成和管理。

缺點

學(xué)習(xí)曲線:Kafka的發(fā)布-訂閱模型和分布式架構(gòu)對初次接觸者來說可能較為復(fù)雜。

配置敏感:Kafka的性能和穩(wěn)定性高度依賴于正確的配置和維護。

合規(guī)性與安全性在金融、醫(yī)療等高敏感領(lǐng)域,Kafka需要滿足嚴格的合規(guī)要求??梢酝ㄟ^配置安全機制(如認證、授權(quán))來確保數(shù)據(jù)的完整性和安全性。

Kafka注意事項

高并發(fā)與分區(qū)的管理在高并發(fā)場景下,合理的分區(qū)劃分和負載均衡是關(guān)鍵。如果分區(qū)數(shù)量過多或負載不平衡,可能導(dǎo)致節(jié)點資源浪費或消息延遲。

配置參數(shù)的優(yōu)化Kafka的性能參數(shù)(如生產(chǎn)速率、消費速率、分區(qū)數(shù)等)需要根據(jù)實際應(yīng)用場景進行調(diào)整。過高的生產(chǎn)速率可能導(dǎo)致消息堆積,而過低的消費速率則會增加客戶端的負載。

網(wǎng)絡(luò)穩(wěn)定性Kafka對網(wǎng)絡(luò)性能有較高的要求。在實際部署中,需要確保集群內(nèi)各節(jié)點之間的網(wǎng)絡(luò)帶寬足夠高,避免因網(wǎng)絡(luò)延遲或分區(qū)不一致導(dǎo)致的消息丟失或延遲處理。

集群的高可用性Kafka的高可用性依賴于集群的配置和管理。在部署時,需要確保節(jié)點的硬件配置一致,定期監(jiān)控集群狀態(tài),并及時處理節(jié)點故障。

監(jiān)控與運維Kafka的監(jiān)控是保障系統(tǒng)穩(wěn)定運行的關(guān)鍵??梢酝ㄟ^工具(如Prometheus、Grafana)實時監(jiān)控集群的性能、消息隊列的健康狀況以及消費者組的負載情況。

docker安裝命令,其中172.16.11.111是宿主機ip,14818是宿主機端口,對應(yīng)容器端口9092:

docker run -d \
  --name kafka \
  -p 14818:9092 \
  -p 9093:9093 \
  -v /tmp/kraft-combined-logs:/tmp/kraft-combined-logs \
  -e TZ=Asia/Shanghai \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_PROCESS_ROLES=broker,controller \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.11.111:14818 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
  -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
  -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
  -e KAFKA_NUM_PARTITIONS=3 \
  -e KAFKA_LOG_DIRS=/tmp/kraft-combined-logs \
  -e CLUSTER_ID=5L6g3nShT-eMCtK--X86sw \
  apache/kafka-native:4.1.0

k3s的yaml,其中172.16.11.111是宿主機ip,14818是宿主機端口,對應(yīng)容器端口9092:

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: kafka
  name: kafka
  namespace: moonfdd
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      initContainers:
        - name: kafka-fix-data-volume-permissions
          image: alpine
          imagePullPolicy: IfNotPresent
          command:
          - sh
          - -c
          - "chown -R 1000:1000 /tmp/kraft-combined-logs"
          volumeMounts:
            - mountPath: /tmp/kraft-combined-logs
              name: volv
      containers:
        - env:
            - name: TZ
              value: Asia/Shanghai
            - name: KAFKA_NODE_ID
              value: "1"
            - name: KAFKA_PROCESS_ROLES
              value: broker,controller
            - name: KAFKA_LISTENERS
              value: PLAINTEXT://:9092,CONTROLLER://:9093
            - name: KAFKA_ADVERTISED_LISTENERS
              value: PLAINTEXT://172.16.11.111:14818
            - name: KAFKA_CONTROLLER_LISTENER_NAMES
              value: CONTROLLER
            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
              value: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
            - name: KAFKA_CONTROLLER_QUORUM_VOTERS
              value: 1@localhost:9093
            - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
              value: "1"
            - name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
              value: "0"
            - name: KAFKA_NUM_PARTITIONS
              value: "3"
            - name: KAFKA_LOG_DIRS
              value: /tmp/kraft-combined-logs
            - name: CLUSTER_ID
              value: "5L6g3nShT-eMCtK--X86sw"  # 固定集群ID,僅首次啟動格式化使用
          image: 'apache/kafka-native:4.1.0'
          imagePullPolicy: IfNotPresent
          name: kafka
          volumeMounts:
            - mountPath: /tmp/kraft-combined-logs
              name: volv
      volumes:
        - hostPath:
            path: /root/k8s/moonfdd/kafka/tmp/kraft-combined-logs
            type: DirectoryOrCreate
          name: volv
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka
  name: kafka
  namespace: moonfdd
spec:
  ports:
    - port: 9092
      protocol: TCP
      targetPort: 9092
      name: 9092-9092
    - port: 9093
      protocol: TCP
      targetPort: 9093
      name: 9093-9093
  selector:
    app: kafka
  type: NodePort

go發(fā)送kafka消息:github.com/segmentio/kafka-go

package main

import (
	"context"
	"log"

	"github.com/segmentio/kafka-go"
)

func main() {
	// 創(chuàng)建一個Kafka writer(Producer)
	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers:  []string{"172.16.11.111:14818"}, // Kafka broker 地址
		Topic:    "test-topic",                    // 發(fā)送的 topic
		Balancer: &kafka.LeastBytes{},             // 負載均衡策略
	})

	// 寫入消息
	err := w.WriteMessages(context.Background(),
		kafka.Message{
			Key:   []byte("Key-A"),
			Value: []byte("Hello Kafka from Go!"),
		},
	)

	if err != nil {
		log.Fatalf("could not write message: %v", err)
	}

	log.Println("Message sent successfully!")

	// 關(guān)閉 writer
	w.Close()
}

go接收kafka消息:github.com/segmentio/kafka-go

package main

import (
	"context"
	"log"

	"github.com/segmentio/kafka-go"
)

func main() {
	// 創(chuàng)建 Kafka reader(Consumer)
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{"172.16.11.111:14818"}, // Kafka broker 地址
		Topic:    "test-topic",                    // 訂閱的 topic
		GroupID:  "my-consumer-group",             // 消費者組,確保相同組會讀取上一 offset
		MinBytes: 10e3,                            // 最小fetch字節(jié)數(shù)
		MaxBytes: 10e6,                            // 最大fetch字節(jié)數(shù)
	})

	for {
		// 讀取消息(會自動從上次的 offset 開始)
		m, err := r.ReadMessage(context.Background())
		if err != nil {
			log.Fatalf("could not read message: %v", err)
		}
		log.Printf("offset:%d | key:%s | value:%s\n", m.Offset, string(m.Key), string(m.Value))
	}

	// r.Close() // 如果你打算退出循環(huán)時關(guān)閉
}

go發(fā)送kafka消息:github.com/IBM/sarama

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/IBM/sarama"
)

func main() {
	// 配置生產(chǎn)者
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true          // 確保消息發(fā)送成功
	config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本確認
	config.Producer.Retry.Max = 3                    // 重試次數(shù)

	// 重要:配置客戶端使用正確的主機
	config.Net.SASL.Enable = false
	config.Net.TLS.Enable = false
	config.Version = sarama.MaxVersion

	// 創(chuàng)建同步生產(chǎn)者
	producer, err := sarama.NewSyncProducer([]string{"172.16.11.111:14818"}, config)
	if err != nil {
		log.Fatalf("創(chuàng)建生產(chǎn)者失敗: %v", err)
	}
	defer producer.Close()

	// 構(gòu)造消息
	message := &sarama.ProducerMessage{
		Topic: "test-topic",
		Key:   sarama.StringEncoder("message-key"),
		Value: sarama.StringEncoder(fmt.Sprintf("Hello Kafka! %v", time.Now())),
	}

	// 發(fā)送消息
	partition, offset, err := producer.SendMessage(message)
	if err != nil {
		log.Fatalf("發(fā)送消息失敗: %v", err)
	}

	fmt.Printf("消息發(fā)送成功! 分區(qū): %d, 偏移量: %d\n", partition, offset)
}

go接收kafka消息:github.com/IBM/sarama

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"

	"github.com/IBM/sarama"
)

type Consumer struct{}

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
	// 會話初始化,可以在這里做一些準備工作
	return nil
}

func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	// 會話結(jié)束時的清理操作
	return nil
}

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	// claim.Messages() 會不斷返回新消息
	for msg := range claim.Messages() {
		fmt.Printf("Topic:%s Partition:%d Offset:%d Value:%s\n",
			msg.Topic, msg.Partition, msg.Offset, string(msg.Value))

		// 標(biāo)記該消息已被處理,Kafka會自動保存offset
		session.MarkMessage(msg, "")
	}
	return nil
}

func main() {
	// Kafka集群地址
	brokers := []string{"172.16.11.111:14818"}
	groupID := "my-group" // 消費者組ID,保持不變才能從上次offset消費
	topics := []string{"test-topic"}

	// 配置
	config := sarama.NewConfig()
	config.Version = sarama.MaxVersion // Kafka版本
	config.Consumer.Return.Errors = true

	// 非首次啟動時自動從上次位置開始
	config.Consumer.Offsets.Initial = sarama.OffsetNewest
	// OffsetNewest: 如果沒有歷史offset,從最新開始;
	// OffsetOldest: 如果沒有歷史offset,從最舊開始。

	// 創(chuàng)建消費者組
	consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)
	if err != nil {
		log.Fatalf("Error creating consumer group: %v", err)
	}
	defer consumerGroup.Close()

	consumer := &Consumer{}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go func() {
		for err := range consumerGroup.Errors() {
			log.Printf("Error: %v", err)
		}
	}()

	log.Println("Kafka consumer started...")
	// 優(yōu)雅退出
	go func() {
		sigchan := make(chan os.Signal, 1)
		signal.Notify(sigchan, os.Interrupt)
		<-sigchan
		cancel()
	}()

	// 循環(huán)消費
	for {
		if err := consumerGroup.Consume(ctx, topics, consumer); err != nil {
			log.Printf("Error from consumer: %v", err)
		}

		// 檢查退出
		if ctx.Err() != nil {
			return
		}
	}
}

到此這篇關(guān)于Go庫實現(xiàn)Kafka消息的發(fā)送與接收(docker和k3s安裝kafka)的文章就介紹到這了,更多相關(guān)docker和k3s實現(xiàn)go語言發(fā)送和接收kafka消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • GO常見的錯誤99%程序員會遇到(解決方法)

    GO常見的錯誤99%程序員會遇到(解決方法)

    這篇文章主要介紹了GO常見的錯誤99%程序員會遇到,本文給出了解決方法,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-01-01
  • go語言實現(xiàn)Elasticsearches批量修改查詢及發(fā)送MQ操作示例

    go語言實現(xiàn)Elasticsearches批量修改查詢及發(fā)送MQ操作示例

    這篇文章主要為大家介紹了go語言實現(xiàn)Elasticsearches批量修改查詢及發(fā)送MQ操作示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-04-04
  • Go語言實現(xiàn)字符串搜索算法Boyer-Moore

    Go語言實現(xiàn)字符串搜索算法Boyer-Moore

    Boyer-Moore?算法是一種非常高效的字符串搜索算法,被廣泛的應(yīng)用于多種字符串搜索場景,下面我們就來學(xué)習(xí)一下如何利用Go語言實現(xiàn)這一字符串搜索算法吧
    2023-11-11
  • 一文詳解go中如何實現(xiàn)定時任務(wù)

    一文詳解go中如何實現(xiàn)定時任務(wù)

    定時任務(wù)是指按照預(yù)定的時間間隔或特定時間點自動執(zhí)行的計劃任務(wù)或操作,這篇文章主要為大家詳細介紹了go中是如何實現(xiàn)定時任務(wù)的,感興趣的可以了解下
    2023-11-11
  • 詳解Golang中下劃線的使用方法

    詳解Golang中下劃線的使用方法

    這篇文章主要介紹了詳解Golang中下劃線的使用方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-01-01
  • GO語言學(xué)習(xí)之語句塊的實現(xiàn)

    GO語言學(xué)習(xí)之語句塊的實現(xiàn)

    本文主要介紹了GO語言學(xué)習(xí)之語句塊的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2025-06-06
  • Golang?Template實現(xiàn)自定義函數(shù)的操作指南

    Golang?Template實現(xiàn)自定義函數(shù)的操作指南

    這篇文章主要為大家詳細介紹了Golang如何利用Template實現(xiàn)自定義函數(shù)的操作,文中的示例代碼簡潔易懂,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2025-02-02
  • Golang標(biāo)準庫syscall詳解(什么是系統(tǒng)調(diào)用)

    Golang標(biāo)準庫syscall詳解(什么是系統(tǒng)調(diào)用)

    最近在研究go語言,發(fā)現(xiàn)go語言系統(tǒng)調(diào)用源碼只有調(diào)用函數(shù)的定義,今天通過本文給大家分享Golang標(biāo)準庫syscall詳解及什么是系統(tǒng)調(diào)用,感興趣的朋友一起看看吧
    2021-05-05
  • Golang自動追蹤GitHub上熱門AI項目

    Golang自動追蹤GitHub上熱門AI項目

    這篇文章主要為大家介紹了Golang自動追蹤GitHub上熱門AI項目,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-12-12
  • Go??iota?常量基本語法介紹

    Go??iota?常量基本語法介紹

    這篇文章主要介紹了Go?為什么要設(shè)計?iota?常量,我們介紹了 Go 中 iota 的基本語法。同時基于歷史資料針對 iota 到底是什么,為什么要這么叫,又有什么用進行了一番研究,需要的朋友可以參考下
    2022-06-06

最新評論