golang如何使用sarama訪問(wèn)kafka
下面一個(gè)客戶端代碼例子訪問(wèn)kafka服務(wù)器,來(lái)發(fā)送和接受消息。
使用方式
1、命令行參數(shù)
$ ./kafkaclient -h Usage of ./client: -ca string CA Certificate (default "ca.pem") -cert string Client Certificate (default "cert.pem") -command string consumer|producer (default "consumer") -host string Common separated kafka hosts (default "localhost:9093") -key string Client Key (default "key.pem") -partition int Kafka topic partition -tls TLS enable -topic string Kafka topic (default "test--topic")
2、作為producer啟動(dòng)
$ ./kafkaclient -command producer \ -host kafka1:9092,kafka2:9092 ## TLS-enabled $ ./kafkaclient -command producer \ -tls -cert client.pem -key client.key -ca ca.pem \ -host kafka1:9093,kafka2:9093
producer發(fā)送消息給kafka:
> aaa 2018/12/15 07:11:21 Produced message: [aaa] > bbb 2018/12/15 07:11:30 Produced message: [bbb] > quit
3、作為consumer啟動(dòng)
$ ./kafkaclient -command consumer \ -host kafka1:9092,kafka2:9092 ## TLS-enabled $ ./kafkaclient -command consumer \ -tls -cert client.pem -key client.key -ca ca.pem \ -host kafka1:9093,kafka2:9093
consumer從kafka接受消息:
2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]
完整源代碼如下
這個(gè)代碼使用到了Shopify/sarama庫(kù),請(qǐng)自行下載使用。
$ cat kafkaclient.go
package main
import (
"flag"
"fmt"
"log"
"os"
"io/ioutil"
"bufio"
"strings"
"crypto/tls"
"crypto/x509"
"github.com/Shopify/sarama"
)
var (
command string
tlsEnable bool
hosts string
topic string
partition int
clientcert string
clientkey string
cacert string
)
func main() {
flag.StringVar(&command, "command", "consumer", "consumer|producer")
flag.BoolVar(&tlsEnable, "tls", false, "TLS enable")
flag.StringVar(&hosts, "host", "localhost:9093", "Common separated kafka hosts")
flag.StringVar(&topic, "topic", "test--topic", "Kafka topic")
flag.IntVar(&partition, "partition", 0, "Kafka topic partition")
flag.StringVar(&clientcert, "cert", "cert.pem", "Client Certificate")
flag.StringVar(&clientkey, "key", "key.pem", "Client Key")
flag.StringVar(&cacert, "ca", "ca.pem", "CA Certificate")
flag.Parse()
config := sarama.NewConfig()
if tlsEnable {
//sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
if err != nil {
log.Fatal(err)
}
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}
client, err := sarama.NewClient(strings.Split(hosts, ","), config)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
if command == "consumer" {
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
loopConsumer(consumer, topic, partition)
} else {
producer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
log.Fatal(err)
}
defer producer.Close()
loopProducer(producer, topic, partition)
}
}
func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) {
// load client cert
clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
if err != nil {
return nil, err
}
// load ca cert pool
cacert, err := ioutil.ReadFile(cacertfile)
if err != nil {
return nil, err
}
cacertpool := x509.NewCertPool()
cacertpool.AppendCertsFromPEM(cacert)
// generate tlcconfig
tlsConfig := tls.Config{}
tlsConfig.RootCAs = cacertpool
tlsConfig.Certificates = []tls.Certificate{clientcert}
tlsConfig.BuildNameToCertificate()
// tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert:
return &tlsConfig, err
}
func loopProducer(producer sarama.AsyncProducer, topic string, partition int) {
scanner := bufio.NewScanner(os.Stdin)
fmt.Print("> ")
for scanner.Scan() {
text := scanner.Text()
if text == "" {
} else if text == "exit" || text == "quit" {
break
} else {
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)
}
fmt.Print("> ")
}
}
func loopConsumer(consumer sarama.Consumer, topic string, partition int) {
partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
if err != nil {
log.Println(err)
return
}
defer partitionConsumer.Close()
for {
msg := <-partitionConsumer.Messages()
log.Printf("Consumed message: [%s], offset: [%d]\n", msg.Value, msg.Offset)
}
}
編譯:
$ go build kafkaclient.go
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
go語(yǔ)言LeetCode題解999可以被一步捕獲的棋子數(shù)
這篇文章主要為大家介紹了go語(yǔ)言LeetCode題解999可以被一步捕獲的棋子數(shù)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12
Golang 端口復(fù)用測(cè)試的實(shí)現(xiàn)
這篇文章主要介紹了Golang 端口復(fù)用測(cè)試的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03
Golang實(shí)現(xiàn)組合模式和裝飾模式實(shí)例詳解
這篇文章主要介紹了Golang實(shí)現(xiàn)組合模式和裝飾模式,本文介紹組合模式和裝飾模式,golang實(shí)現(xiàn)兩種模式有共同之處,但在具體應(yīng)用場(chǎng)景有差異。通過(guò)對(duì)比兩個(gè)模式,可以加深理解,需要的朋友可以參考下2022-11-11
golang如何通過(guò)viper讀取config.yaml文件
這篇文章主要介紹了golang通過(guò)viper讀取config.yaml文件,圍繞golang讀取config.yaml文件的相關(guān)資料展開詳細(xì)內(nèi)容,需要的小伙伴可以參考一下2022-03-03
Go語(yǔ)言中new()和 make()的區(qū)別詳解
這篇文章主要介紹了Go語(yǔ)言中new()和 make()的區(qū)別詳解,本文講解了new 的主要特性、make 的主要特性,并對(duì)它們的區(qū)別做了總結(jié),需要的朋友可以參考下2014-10-10
Go實(shí)現(xiàn)mongodb增刪改查工具類的代碼示例
這篇文章主要給大家介紹了關(guān)于Go實(shí)現(xiàn)mongodb增刪改查工具類的相關(guān)資料,MongoDB是一個(gè)NoSQL數(shù)據(jù)庫(kù),它提供了靈活的文檔存儲(chǔ)模型以及強(qiáng)大的查詢和操作功能,需要的朋友可以參考下2023-10-10

