亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

一文詳解Golang連接kafka的基本操作

 更新時(shí)間:2025年03月10日 10:39:04   作者:執(zhí)念斬長河  
這篇文章主要為大家詳細(xì)介紹了Golang中連接kafka的基本操作的相關(guān)知識,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下

1.kafka的學(xué)習(xí)

1.1 啟動(dòng)kafka與zookeeper

kafka與zookeeper是相關(guān)聯(lián)的

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

1.2 創(chuàng)建topic

bin/kafka-topics.sh --create --topic hello --bootstrap-server 主機(jī)名:9092

1.3 生產(chǎn)消息

bin/kafka-console-producer.sh --broker-list 主機(jī)名:9092 --topic hello

運(yùn)行后可以發(fā)送多條,ctrl+c退出

1.4 消費(fèi)之前的消息

bin/kafka-console-consumer.sh --bootstrap-server 主機(jī)名:9092 --from-beginning --topic hello

1.5 指定偏移量消費(fèi)

bin/kafka-console-consumer.sh --bootstrap-server 主機(jī)名:9092 --partition 0 --offset 1 --topic hello

1.6 消費(fèi)最新的信息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello

2 go操作

2.1 發(fā)送消息

// Kafka 配置
const (
	KafkaBroker = "u8sMaster:9092" // 替換為你的 Kafka Broker 地址
	KafkaTopic  = "k8s-version"          // Kafka 主題
)

func main() {
	sendMesgKafka()
}

func sendMesgKafka() {
	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers:  []string{KafkaBroker},
		Topic:    KafkaTopic,
		Balancer: &kafka.LeastBytes{},
	})

	err := w.WriteMessages(context.Background(),
		kafka.Message{
			Key:   []byte("Key-A"),
			Value: []byte("one!"),
		},
		kafka.Message{
			Key:   []byte("Key-B"),
			Value: []byte("two!"),
		},
		kafka.Message{
			Key:   []byte("Key-C"),
			Value: []byte("three!"),
		},
	)

	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	if err := w.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}

	fmt.Println("Message sent successfully")

}

2.2 消費(fèi)消息

// to consume messages
topic := "test"
partition := 0

conn, err := kafka.DialLeader(context.Background(), "tcp", "u8sMaster:9092", topic, partition)
if err != nil {
    log.Fatal("failed to dial leader:", err)
}

conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

b := make([]byte, 10e3) // 10KB max per message
for {
    n, err := batch.Read(b)
    if err != nil {
        break
    }
    fmt.Println(string(b[:n]))
}

if err := batch.Close(); err != nil {
    log.Fatal("failed to close batch:", err)
}

if err := conn.Close(); err != nil {
    log.Fatal("failed to close connection:", err)
}

2.3 列出所有topic

func main() {
    conn, err := kafka.Dial("tcp", "u8sMaster:9092")
    if err != nil {
        panic(err.Error())
    }
    defer conn.Close()
    
    partitions, err := conn.ReadPartitions()
    if err != nil {
        panic(err.Error())
    }
    
    m := map[string]struct{}{}
    
    for _, p := range partitions {
        m[p.Topic] = struct{}{}
    }
    for k := range m {
        fmt.Println(k)
    }
}

2.4 創(chuàng)建topic

func main() {
        conn, err := kafka.DialLeader(context.Background(), "tcp", "u9sMaster:9092", "topic2", 0)
        if err != nil {
            panic(err.Error())
        }
}

精準(zhǔn)地創(chuàng)建topic

func main() {
    conn, err := kafka.Dial("tcp", "u8sMaster:9092")
    if err != nil {
        panic(err.Error())
    }
    defer conn.Close()
    controller, err := conn.Controller()
    if err != nil {
        panic(err.Error())
    }
    var connLeader *kafka.Conn
    connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
    if err != nil {
        panic(err.Error())
    }
    defer connLeader.Close()
}

這里省略了kafka集群的配置,未來有機(jī)會補(bǔ)充

以上就是一文詳解Golang連接kafka的基本操作的詳細(xì)內(nèi)容,更多關(guān)于go連接kafka的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Go實(shí)現(xiàn)分布式系統(tǒng)高可用限流器實(shí)戰(zhàn)

    Go實(shí)現(xiàn)分布式系統(tǒng)高可用限流器實(shí)戰(zhàn)

    這篇文章主要為大家介紹了Go實(shí)現(xiàn)分布式系統(tǒng)高可用限流器實(shí)戰(zhàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-06-06
  • 淺析Go語言中的超時(shí)控制

    淺析Go語言中的超時(shí)控制

    日常開發(fā)中我們大概率會遇到超時(shí)控制的場景,而一個(gè)良好的超時(shí)控制可以有效的避免一些問題,所以本文就來和大家深入探討一下Go語言中的超時(shí)控制吧
    2023-10-10
  • Go高級特性探究之優(yōu)先級隊(duì)列詳解

    Go高級特性探究之優(yōu)先級隊(duì)列詳解

    Heap?是一種數(shù)據(jù)結(jié)構(gòu),這種數(shù)據(jù)結(jié)構(gòu)常用于實(shí)現(xiàn)優(yōu)先隊(duì)列,這篇文章主要就是來和大家深入探討一下GO語言中的優(yōu)先級隊(duì)列,感興趣的可以了解一下
    2023-06-06
  • 手把手帶你走進(jìn)Go語言之條件表達(dá)式

    手把手帶你走進(jìn)Go語言之條件表達(dá)式

    條件表達(dá)式由條件運(yùn)算符構(gòu)成,并常用條件表達(dá)式構(gòu)成一個(gè)賦值語句,本文給大家介紹了在Go語言中條件表達(dá)式的具體用法,講述的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值
    2021-09-09
  • 詳解Go語言的錯(cuò)誤處理和資源管理

    詳解Go語言的錯(cuò)誤處理和資源管理

    資源處理是什么?打開文件需要關(guān)閉,打開數(shù)據(jù)庫連接,連接需要釋放。這些成對出現(xiàn)的就是資源管理。有時(shí)候我們雖然釋放了,但是程序在中間出錯(cuò)了,那么可能導(dǎo)致資源釋放失敗。如何保證打開的文件一定會被關(guān)閉呢?這就是資源管理與錯(cuò)誤處理考慮的一個(gè)原因
    2021-06-06
  • Golang使用channel實(shí)現(xiàn)一個(gè)優(yōu)雅退出功能

    Golang使用channel實(shí)現(xiàn)一個(gè)優(yōu)雅退出功能

    最近補(bǔ)?Golang?channel?方面八股的時(shí)候發(fā)現(xiàn)用?channel?實(shí)現(xiàn)一個(gè)優(yōu)雅退出功能好像不是很難,之前寫的?HTTP?框架剛好也不支持優(yōu)雅退出功能,于是就參考了?Hertz?優(yōu)雅退出方面的代碼,為我的?PIANO?補(bǔ)足了這個(gè)?feature
    2023-03-03
  • Go與Redis實(shí)現(xiàn)分布式互斥鎖和紅鎖

    Go與Redis實(shí)現(xiàn)分布式互斥鎖和紅鎖

    這篇文章主要介紹了Go與Redis實(shí)現(xiàn)分布式互斥鎖和紅鎖,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下
    2022-09-09
  • Go數(shù)組與切片輕松掌握

    Go數(shù)組與切片輕松掌握

    在Java的核心庫中,集合框架可謂鼎鼎大名:Array、List、Set等等,隨便拎一個(gè)出來都值得開發(fā)者好好學(xué)習(xí)如何使用甚至是背后的設(shè)計(jì)源碼。雖然Go語言沒有如此豐富的容器類型,但也有一些基本的容器供開發(fā)者使用,接下來讓我們認(rèn)識一下這些容器類型吧
    2022-11-11
  • golang?gorm更新日志執(zhí)行SQL示例詳解

    golang?gorm更新日志執(zhí)行SQL示例詳解

    這篇文章主要為大家介紹了golang?gorm更新日志執(zhí)行SQL示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪
    2022-04-04
  • Go 結(jié)構(gòu)體、數(shù)組、字典和 json 字符串的相互轉(zhuǎn)換方法

    Go 結(jié)構(gòu)體、數(shù)組、字典和 json 字符串的相互轉(zhuǎn)換方法

    今天小編就為大家分享一篇Go 結(jié)構(gòu)體、數(shù)組、字典和 json 字符串的相互轉(zhuǎn)換方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2019-08-08

最新評論