Go庫實現(xiàn)Kafka消息的發(fā)送與接收(docker和k3s安裝kafka)
kafka是什么
Kafka傳統(tǒng)定義 :Kafka是一個分布式的基于發(fā)布/訂閱模式的消息隊列(Message Queue),主要應(yīng)用于大數(shù)據(jù)實時處理領(lǐng)域。 發(fā)布/訂閱 :消息的發(fā)布者不會將消息直接發(fā)送給特定的訂閱者,而是將發(fā)布的消息 分為不同的類別,訂閱者只接收感興趣的消息。
Kafka最新定義:Kafka是一個開源的分布式事件流平臺(Event Streaming Platform),被數(shù)千家公司用于高性能數(shù)據(jù)管道、流分析、數(shù)據(jù)集成和關(guān)鍵任務(wù)應(yīng)用。
消息隊列的應(yīng)用場景無外乎是:削峰填谷、應(yīng)用解耦、異步處理等等,具體使用案例我們在之前講rabbitmq基礎(chǔ)篇已經(jīng)詳述過,這里不在做講述,這里說一下消息隊列的兩種模型:
- 點對點模型 :也叫消息隊列模型。如果拿上面那個“民間版”的定義來說,那么系統(tǒng) A 發(fā)送的消息只能被系統(tǒng) B 接收,其他任何系統(tǒng)都不能讀取 A 發(fā)送的消息。日常生活的例子比如電話客服就屬于這種模型:同一個客戶呼入電話只能被一位客服人員處理,第二個客服人員不能為該客戶服務(wù)。
- 發(fā)布 / 訂閱模型 :與上面不同的是,它有一個主題(Topic)的概念,你可以理解成邏輯語義相近的消息容器。該模型也有發(fā)送方和接收方,只不過提法不同。發(fā)送方也稱為發(fā)布者(Publisher),接收方稱為訂閱者(Subscriber)。和點對點模型不同的是,這個模型可能存在多個發(fā)布者向相同的主題發(fā)送消息,而訂閱者也可能存在多個,它們都能接收到相同主題的消息。生活中的報紙訂閱就是一種典型的發(fā)布 / 訂閱模型。
kafka基礎(chǔ)架構(gòu)和核心概念

在 Kafka 中,發(fā)布訂閱的對象是 主題(Topic ),你可以為每個業(yè)務(wù)、每個應(yīng)用甚至是每類數(shù)據(jù)都創(chuàng)建專屬的主題。
生產(chǎn)者(Producer) :消息生產(chǎn)者,就是向 kafka broker 發(fā)消息的客戶端,生產(chǎn)者程序通常持續(xù)不斷地向一個或多個主題發(fā)送消息。
消費者(Consumer) :消息消費者,向 kafka broker 取消息的客戶端,消費者就是訂閱這些主題消息的客戶端應(yīng)用程序。
和生產(chǎn)者類似,消費者也能夠同時訂閱多個主題的消息。我們把生產(chǎn)者和消費者統(tǒng)稱為客戶端(Clients)。你可以同時運行多個生產(chǎn)者和消費者實例,這些實例會不斷地向 Kafka 集群中的多個主題生產(chǎn)和消費消息。
消費者組Consumer Group** (CG)**:由多個 consumer 組成。消費者組內(nèi)每個消費者負責(zé)消費不同分區(qū)的數(shù)據(jù),一個分區(qū)只能由一個組內(nèi)消費者消費,消費者組之間互不影響。所有的消費者都屬于某個消費者組,即消費者組是邏輯上的一個訂閱者。
Broker :一臺 kafka 服務(wù)器就是一個 broker。一個集群由多個 broker 組成。一個 broker 可以容納多個 topic。
主題(topic) :可以理解為一個隊列,生產(chǎn)者和消費者面向的都是一個 topic;
分區(qū)(Partition) :為了實現(xiàn)擴展性,一個非常大的 topic 可以分布到多個 broker(即服務(wù)器)上, 一個 topic 可以分為多個 partition,每個 partition 是一個有序的隊列;
副本(Replica) :副本,為保證集群中的某個節(jié)點發(fā)生故障時,該節(jié)點上的 partition 數(shù)據(jù)不丟失,且 kafka 仍然能夠繼續(xù)工作,kafka 提供了副本機制,一個 topic 的每個分區(qū)都有若干個副本, 一個 leader 和若干個 follower 。
leader :每個分區(qū)多個副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對象,以及消費者消費數(shù)據(jù)的對 象都是 leader。
follower :每個分區(qū)多個副本中的“從”,實時從 leader 中同步數(shù)據(jù),保持和 leader 數(shù)據(jù) 的同步。leader 發(fā)生故障時,某個 follower 會成為新的 follower
副本的工作機制也很簡單:生產(chǎn)者總是向領(lǐng)導(dǎo)者副本寫消息;而消費者總是從領(lǐng)導(dǎo)者副本讀消息。至于追隨者副本,它只做一件事:向領(lǐng)導(dǎo)者副本發(fā)送請求,請求領(lǐng)導(dǎo)者把最新生產(chǎn)的消息發(fā)給它,這樣它能保持與領(lǐng)導(dǎo)者的同步。
Kafka 使用消息日志(Log)來保存數(shù)據(jù),一個日志就是磁盤上一個只能追加寫(Append-only)消息的物理文件。因為只能追加寫入,故 避免了緩慢的隨機 I/O 操作,改為性能較好的順序 I/O 寫操作,這也是實現(xiàn) Kafka 高吞吐量特性的一個重要手段 。不過如果你不停地向一個日志寫入消息,最終也會耗盡所有的磁盤空間,因此 Kafka 必然要定期地刪除消息以回收磁盤。怎么刪除呢?簡單來說就是通過日志段(Log Segment)機制。在 Kafka 底層,一個日志又近一步細分成多個日志段,消息被追加寫到當(dāng)前最新的日志段中,當(dāng)寫滿了一個日志段后,Kafka 會自動切分出一個新的日志段,并將老的日志段封存起來。Kafka 在后臺還有定時任務(wù)會定期地檢查老的日志段是否能夠被刪除,從而實現(xiàn)回收磁盤空間的目的
Kafka優(yōu)缺點
優(yōu)點
高吞吐量:Kafka的順序日志機制和高可用性設(shè)計使其在高并發(fā)場景下表現(xiàn)出色。
擴展性強:通過分區(qū)和復(fù)制機制,Kafka能夠輕松擴展到多個節(jié)點。 ** easy to use**:Kafka提供了豐富的 API 和工具支持,簡化了集成和管理。
缺點
學(xué)習(xí)曲線:Kafka的發(fā)布-訂閱模型和分布式架構(gòu)對初次接觸者來說可能較為復(fù)雜。
配置敏感:Kafka的性能和穩(wěn)定性高度依賴于正確的配置和維護。
合規(guī)性與安全性在金融、醫(yī)療等高敏感領(lǐng)域,Kafka需要滿足嚴格的合規(guī)要求??梢酝ㄟ^配置安全機制(如認證、授權(quán))來確保數(shù)據(jù)的完整性和安全性。
Kafka注意事項
高并發(fā)與分區(qū)的管理在高并發(fā)場景下,合理的分區(qū)劃分和負載均衡是關(guān)鍵。如果分區(qū)數(shù)量過多或負載不平衡,可能導(dǎo)致節(jié)點資源浪費或消息延遲。
配置參數(shù)的優(yōu)化Kafka的性能參數(shù)(如生產(chǎn)速率、消費速率、分區(qū)數(shù)等)需要根據(jù)實際應(yīng)用場景進行調(diào)整。過高的生產(chǎn)速率可能導(dǎo)致消息堆積,而過低的消費速率則會增加客戶端的負載。
網(wǎng)絡(luò)穩(wěn)定性Kafka對網(wǎng)絡(luò)性能有較高的要求。在實際部署中,需要確保集群內(nèi)各節(jié)點之間的網(wǎng)絡(luò)帶寬足夠高,避免因網(wǎng)絡(luò)延遲或分區(qū)不一致導(dǎo)致的消息丟失或延遲處理。
集群的高可用性Kafka的高可用性依賴于集群的配置和管理。在部署時,需要確保節(jié)點的硬件配置一致,定期監(jiān)控集群狀態(tài),并及時處理節(jié)點故障。
監(jiān)控與運維Kafka的監(jiān)控是保障系統(tǒng)穩(wěn)定運行的關(guān)鍵??梢酝ㄟ^工具(如Prometheus、Grafana)實時監(jiān)控集群的性能、消息隊列的健康狀況以及消費者組的負載情況。
docker安裝命令,其中172.16.11.111是宿主機ip,14818是宿主機端口,對應(yīng)容器端口9092:
docker run -d \ --name kafka \ -p 14818:9092 \ -p 9093:9093 \ -v /tmp/kraft-combined-logs:/tmp/kraft-combined-logs \ -e TZ=Asia/Shanghai \ -e KAFKA_NODE_ID=1 \ -e KAFKA_PROCESS_ROLES=broker,controller \ -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.11.111:14818 \ -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \ -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \ -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \ -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \ -e KAFKA_NUM_PARTITIONS=3 \ -e KAFKA_LOG_DIRS=/tmp/kraft-combined-logs \ -e CLUSTER_ID=5L6g3nShT-eMCtK--X86sw \ apache/kafka-native:4.1.0
k3s的yaml,其中172.16.11.111是宿主機ip,14818是宿主機端口,對應(yīng)容器端口9092:
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: kafka
name: kafka
namespace: moonfdd
spec:
replicas: 1
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
initContainers:
- name: kafka-fix-data-volume-permissions
image: alpine
imagePullPolicy: IfNotPresent
command:
- sh
- -c
- "chown -R 1000:1000 /tmp/kraft-combined-logs"
volumeMounts:
- mountPath: /tmp/kraft-combined-logs
name: volv
containers:
- env:
- name: TZ
value: Asia/Shanghai
- name: KAFKA_NODE_ID
value: "1"
- name: KAFKA_PROCESS_ROLES
value: broker,controller
- name: KAFKA_LISTENERS
value: PLAINTEXT://:9092,CONTROLLER://:9093
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://172.16.11.111:14818
- name: KAFKA_CONTROLLER_LISTENER_NAMES
value: CONTROLLER
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- name: KAFKA_CONTROLLER_QUORUM_VOTERS
value: 1@localhost:9093
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "1"
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
value: "1"
- name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
value: "0"
- name: KAFKA_NUM_PARTITIONS
value: "3"
- name: KAFKA_LOG_DIRS
value: /tmp/kraft-combined-logs
- name: CLUSTER_ID
value: "5L6g3nShT-eMCtK--X86sw" # 固定集群ID,僅首次啟動格式化使用
image: 'apache/kafka-native:4.1.0'
imagePullPolicy: IfNotPresent
name: kafka
volumeMounts:
- mountPath: /tmp/kraft-combined-logs
name: volv
volumes:
- hostPath:
path: /root/k8s/moonfdd/kafka/tmp/kraft-combined-logs
type: DirectoryOrCreate
name: volv
---
apiVersion: v1
kind: Service
metadata:
labels:
app: kafka
name: kafka
namespace: moonfdd
spec:
ports:
- port: 9092
protocol: TCP
targetPort: 9092
name: 9092-9092
- port: 9093
protocol: TCP
targetPort: 9093
name: 9093-9093
selector:
app: kafka
type: NodePort
go發(fā)送kafka消息:github.com/segmentio/kafka-go
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
// 創(chuàng)建一個Kafka writer(Producer)
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"172.16.11.111:14818"}, // Kafka broker 地址
Topic: "test-topic", // 發(fā)送的 topic
Balancer: &kafka.LeastBytes{}, // 負載均衡策略
})
// 寫入消息
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte("Hello Kafka from Go!"),
},
)
if err != nil {
log.Fatalf("could not write message: %v", err)
}
log.Println("Message sent successfully!")
// 關(guān)閉 writer
w.Close()
}
go接收kafka消息:github.com/segmentio/kafka-go
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
// 創(chuàng)建 Kafka reader(Consumer)
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"172.16.11.111:14818"}, // Kafka broker 地址
Topic: "test-topic", // 訂閱的 topic
GroupID: "my-consumer-group", // 消費者組,確保相同組會讀取上一 offset
MinBytes: 10e3, // 最小fetch字節(jié)數(shù)
MaxBytes: 10e6, // 最大fetch字節(jié)數(shù)
})
for {
// 讀取消息(會自動從上次的 offset 開始)
m, err := r.ReadMessage(context.Background())
if err != nil {
log.Fatalf("could not read message: %v", err)
}
log.Printf("offset:%d | key:%s | value:%s\n", m.Offset, string(m.Key), string(m.Value))
}
// r.Close() // 如果你打算退出循環(huán)時關(guān)閉
}
go發(fā)送kafka消息:github.com/IBM/sarama
package main
import (
"fmt"
"log"
"time"
"github.com/IBM/sarama"
)
func main() {
// 配置生產(chǎn)者
config := sarama.NewConfig()
config.Producer.Return.Successes = true // 確保消息發(fā)送成功
config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本確認
config.Producer.Retry.Max = 3 // 重試次數(shù)
// 重要:配置客戶端使用正確的主機
config.Net.SASL.Enable = false
config.Net.TLS.Enable = false
config.Version = sarama.MaxVersion
// 創(chuàng)建同步生產(chǎn)者
producer, err := sarama.NewSyncProducer([]string{"172.16.11.111:14818"}, config)
if err != nil {
log.Fatalf("創(chuàng)建生產(chǎn)者失敗: %v", err)
}
defer producer.Close()
// 構(gòu)造消息
message := &sarama.ProducerMessage{
Topic: "test-topic",
Key: sarama.StringEncoder("message-key"),
Value: sarama.StringEncoder(fmt.Sprintf("Hello Kafka! %v", time.Now())),
}
// 發(fā)送消息
partition, offset, err := producer.SendMessage(message)
if err != nil {
log.Fatalf("發(fā)送消息失敗: %v", err)
}
fmt.Printf("消息發(fā)送成功! 分區(qū): %d, 偏移量: %d\n", partition, offset)
}
go接收kafka消息:github.com/IBM/sarama
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"github.com/IBM/sarama"
)
type Consumer struct{}
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// 會話初始化,可以在這里做一些準備工作
return nil
}
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
// 會話結(jié)束時的清理操作
return nil
}
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// claim.Messages() 會不斷返回新消息
for msg := range claim.Messages() {
fmt.Printf("Topic:%s Partition:%d Offset:%d Value:%s\n",
msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
// 標(biāo)記該消息已被處理,Kafka會自動保存offset
session.MarkMessage(msg, "")
}
return nil
}
func main() {
// Kafka集群地址
brokers := []string{"172.16.11.111:14818"}
groupID := "my-group" // 消費者組ID,保持不變才能從上次offset消費
topics := []string{"test-topic"}
// 配置
config := sarama.NewConfig()
config.Version = sarama.MaxVersion // Kafka版本
config.Consumer.Return.Errors = true
// 非首次啟動時自動從上次位置開始
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// OffsetNewest: 如果沒有歷史offset,從最新開始;
// OffsetOldest: 如果沒有歷史offset,從最舊開始。
// 創(chuàng)建消費者組
consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)
if err != nil {
log.Fatalf("Error creating consumer group: %v", err)
}
defer consumerGroup.Close()
consumer := &Consumer{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for err := range consumerGroup.Errors() {
log.Printf("Error: %v", err)
}
}()
log.Println("Kafka consumer started...")
// 優(yōu)雅退出
go func() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, os.Interrupt)
<-sigchan
cancel()
}()
// 循環(huán)消費
for {
if err := consumerGroup.Consume(ctx, topics, consumer); err != nil {
log.Printf("Error from consumer: %v", err)
}
// 檢查退出
if ctx.Err() != nil {
return
}
}
}
到此這篇關(guān)于Go庫實現(xiàn)Kafka消息的發(fā)送與接收(docker和k3s安裝kafka)的文章就介紹到這了,更多相關(guān)docker和k3s實現(xiàn)go語言發(fā)送和接收kafka消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
go語言實現(xiàn)Elasticsearches批量修改查詢及發(fā)送MQ操作示例
這篇文章主要為大家介紹了go語言實現(xiàn)Elasticsearches批量修改查詢及發(fā)送MQ操作示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-04-04
Go語言實現(xiàn)字符串搜索算法Boyer-Moore
Boyer-Moore?算法是一種非常高效的字符串搜索算法,被廣泛的應(yīng)用于多種字符串搜索場景,下面我們就來學(xué)習(xí)一下如何利用Go語言實現(xiàn)這一字符串搜索算法吧2023-11-11
Golang?Template實現(xiàn)自定義函數(shù)的操作指南
這篇文章主要為大家詳細介紹了Golang如何利用Template實現(xiàn)自定義函數(shù)的操作,文中的示例代碼簡潔易懂,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-02-02
Golang標(biāo)準庫syscall詳解(什么是系統(tǒng)調(diào)用)
最近在研究go語言,發(fā)現(xiàn)go語言系統(tǒng)調(diào)用源碼只有調(diào)用函數(shù)的定義,今天通過本文給大家分享Golang標(biāo)準庫syscall詳解及什么是系統(tǒng)調(diào)用,感興趣的朋友一起看看吧2021-05-05

