一文詳解Golang連接kafka的基本操作
1.kafka的學(xué)習(xí)
1.1 啟動(dòng)kafka與zookeeper
kafka與zookeeper是相關(guān)聯(lián)的
bin/zookeeper-server-start.sh config/zookeeper.properties
與
bin/kafka-server-start.sh config/server.properties
1.2 創(chuàng)建topic
bin/kafka-topics.sh --create --topic hello --bootstrap-server 主機(jī)名:9092
1.3 生產(chǎn)消息
bin/kafka-console-producer.sh --broker-list 主機(jī)名:9092 --topic hello
運(yùn)行后可以發(fā)送多條,ctrl+c退出
1.4 消費(fèi)之前的消息
bin/kafka-console-consumer.sh --bootstrap-server 主機(jī)名:9092 --from-beginning --topic hello
1.5 指定偏移量消費(fèi)
bin/kafka-console-consumer.sh --bootstrap-server 主機(jī)名:9092 --partition 0 --offset 1 --topic hello
1.6 消費(fèi)最新的信息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello
2 go操作
2.1 發(fā)送消息
// Kafka 配置 const ( KafkaBroker = "u8sMaster:9092" // 替換為你的 Kafka Broker 地址 KafkaTopic = "k8s-version" // Kafka 主題 ) func main() { sendMesgKafka() } func sendMesgKafka() { w := kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{KafkaBroker}, Topic: KafkaTopic, Balancer: &kafka.LeastBytes{}, }) err := w.WriteMessages(context.Background(), kafka.Message{ Key: []byte("Key-A"), Value: []byte("one!"), }, kafka.Message{ Key: []byte("Key-B"), Value: []byte("two!"), }, kafka.Message{ Key: []byte("Key-C"), Value: []byte("three!"), }, ) if err != nil { log.Fatal("failed to write messages:", err) } if err := w.Close(); err != nil { log.Fatal("failed to close writer:", err) } fmt.Println("Message sent successfully") }
2.2 消費(fèi)消息
// to consume messages topic := "test" partition := 0 conn, err := kafka.DialLeader(context.Background(), "tcp", "u8sMaster:9092", topic, partition) if err != nil { log.Fatal("failed to dial leader:", err) } conn.SetReadDeadline(time.Now().Add(10*time.Second)) batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max b := make([]byte, 10e3) // 10KB max per message for { n, err := batch.Read(b) if err != nil { break } fmt.Println(string(b[:n])) } if err := batch.Close(); err != nil { log.Fatal("failed to close batch:", err) } if err := conn.Close(); err != nil { log.Fatal("failed to close connection:", err) }
2.3 列出所有topic
func main() { conn, err := kafka.Dial("tcp", "u8sMaster:9092") if err != nil { panic(err.Error()) } defer conn.Close() partitions, err := conn.ReadPartitions() if err != nil { panic(err.Error()) } m := map[string]struct{}{} for _, p := range partitions { m[p.Topic] = struct{}{} } for k := range m { fmt.Println(k) } }
2.4 創(chuàng)建topic
func main() { conn, err := kafka.DialLeader(context.Background(), "tcp", "u9sMaster:9092", "topic2", 0) if err != nil { panic(err.Error()) } }
精準(zhǔn)地創(chuàng)建topic
func main() { conn, err := kafka.Dial("tcp", "u8sMaster:9092") if err != nil { panic(err.Error()) } defer conn.Close() controller, err := conn.Controller() if err != nil { panic(err.Error()) } var connLeader *kafka.Conn connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) if err != nil { panic(err.Error()) } defer connLeader.Close() }
這里省略了kafka集群的配置,未來有機(jī)會補(bǔ)充
以上就是一文詳解Golang連接kafka的基本操作的詳細(xì)內(nèi)容,更多關(guān)于go連接kafka的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go實(shí)現(xiàn)分布式系統(tǒng)高可用限流器實(shí)戰(zhàn)
這篇文章主要為大家介紹了Go實(shí)現(xiàn)分布式系統(tǒng)高可用限流器實(shí)戰(zhàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06Golang使用channel實(shí)現(xiàn)一個(gè)優(yōu)雅退出功能
最近補(bǔ)?Golang?channel?方面八股的時(shí)候發(fā)現(xiàn)用?channel?實(shí)現(xiàn)一個(gè)優(yōu)雅退出功能好像不是很難,之前寫的?HTTP?框架剛好也不支持優(yōu)雅退出功能,于是就參考了?Hertz?優(yōu)雅退出方面的代碼,為我的?PIANO?補(bǔ)足了這個(gè)?feature2023-03-03Go與Redis實(shí)現(xiàn)分布式互斥鎖和紅鎖
這篇文章主要介紹了Go與Redis實(shí)現(xiàn)分布式互斥鎖和紅鎖,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-09-09Go 結(jié)構(gòu)體、數(shù)組、字典和 json 字符串的相互轉(zhuǎn)換方法
今天小編就為大家分享一篇Go 結(jié)構(gòu)體、數(shù)組、字典和 json 字符串的相互轉(zhuǎn)換方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-08-08