亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

golang如何使用sarama訪問(wèn)kafka

 更新時(shí)間:2018年12月17日 09:21:35   作者:CodingCode  
這篇文章主要介紹了golang如何使用sarama訪問(wèn)kafka,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧

下面一個(gè)客戶端代碼例子訪問(wèn)kafka服務(wù)器,來(lái)發(fā)送和接受消息。

使用方式

1、命令行參數(shù)

$ ./kafkaclient -h
Usage of ./client:
 -ca string
  CA Certificate (default "ca.pem")
 -cert string
  Client Certificate (default "cert.pem")
 -command string
  consumer|producer (default "consumer")
 -host string
  Common separated kafka hosts (default "localhost:9093")
 -key string
  Client Key (default "key.pem")
 -partition int
  Kafka topic partition
 -tls
  TLS enable
 -topic string
  Kafka topic (default "test--topic")

2、作為producer啟動(dòng)

$ ./kafkaclient -command producer \
 -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command producer \
 -tls -cert client.pem -key client.key -ca ca.pem \
 -host kafka1:9093,kafka2:9093

producer發(fā)送消息給kafka:

> aaa
2018/12/15 07:11:21 Produced message: [aaa]
> bbb
2018/12/15 07:11:30 Produced message: [bbb]
> quit

3、作為consumer啟動(dòng)

$ ./kafkaclient -command consumer \
 -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command consumer \
 -tls -cert client.pem -key client.key -ca ca.pem \
 -host kafka1:9093,kafka2:9093

consumer從kafka接受消息:

2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]

完整源代碼如下

這個(gè)代碼使用到了Shopify/sarama庫(kù),請(qǐng)自行下載使用。

$ cat kafkaclient.go
package main

import (
 "flag"
 "fmt"
 "log"
 "os"
 "io/ioutil"
 "bufio"
 "strings"

 "crypto/tls"
 "crypto/x509"

 "github.com/Shopify/sarama"
)

var (
 command  string
 tlsEnable bool
 hosts  string
 topic  string
 partition int
 clientcert string
 clientkey string
 cacert  string
)

func main() {
 flag.StringVar(&command, "command",  "consumer",   "consumer|producer")
 flag.BoolVar(&tlsEnable, "tls",   false,    "TLS enable")
 flag.StringVar(&hosts,  "host",   "localhost:9093", "Common separated kafka hosts")
 flag.StringVar(&topic,  "topic",  "test--topic",  "Kafka topic")
 flag.IntVar(&partition,  "partition", 0,     "Kafka topic partition")
 flag.StringVar(&clientcert, "cert",   "cert.pem",   "Client Certificate")
 flag.StringVar(&clientkey, "key",   "key.pem",   "Client Key")
 flag.StringVar(&cacert,  "ca",   "ca.pem",   "CA Certificate")
 flag.Parse()

 config := sarama.NewConfig()
 if tlsEnable {
  //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
  if err != nil {
   log.Fatal(err)
  }

  config.Net.TLS.Enable = true
  config.Net.TLS.Config = tlsConfig
 }
 client, err := sarama.NewClient(strings.Split(hosts, ","), config)
 if err != nil {
  log.Fatalf("unable to create kafka client: %q", err)
 }

 if command == "consumer" {
  consumer, err := sarama.NewConsumerFromClient(client)
  if err != nil {
   log.Fatal(err)
  }
  defer consumer.Close()
  loopConsumer(consumer, topic, partition)
 } else {
  producer, err := sarama.NewAsyncProducerFromClient(client)
  if err != nil {
   log.Fatal(err)
  }
  defer producer.Close()
  loopProducer(producer, topic, partition)
 }
}

func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) {
 // load client cert
 clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
 if err != nil {
  return nil, err
 }

 // load ca cert pool
 cacert, err := ioutil.ReadFile(cacertfile)
 if err != nil {
  return nil, err
 }
 cacertpool := x509.NewCertPool()
 cacertpool.AppendCertsFromPEM(cacert)

 // generate tlcconfig
 tlsConfig := tls.Config{}
 tlsConfig.RootCAs = cacertpool
 tlsConfig.Certificates = []tls.Certificate{clientcert}
 tlsConfig.BuildNameToCertificate()
 // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert:
 return &tlsConfig, err
}

func loopProducer(producer sarama.AsyncProducer, topic string, partition int) {
 scanner := bufio.NewScanner(os.Stdin)
 fmt.Print("> ")
 for scanner.Scan() {
  text := scanner.Text()
  if text == "" {
  } else if text == "exit" || text == "quit" {
   break
  } else {
   producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
   log.Printf("Produced message: [%s]\n",text)
  }
  fmt.Print("> ")
 }
}

func loopConsumer(consumer sarama.Consumer, topic string, partition int) {
 partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
 if err != nil {
  log.Println(err)
  return
 }
 defer partitionConsumer.Close()

 for {
  msg := <-partitionConsumer.Messages()
  log.Printf("Consumed message: [%s], offset: [%d]\n", msg.Value, msg.Offset)
 }
}

編譯:

$ go build kafkaclient.go

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • Go語(yǔ)言os包用法詳解

    Go語(yǔ)言os包用法詳解

    本文主要介紹了Go語(yǔ)言os包用法詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • go語(yǔ)言LeetCode題解999可以被一步捕獲的棋子數(shù)

    go語(yǔ)言LeetCode題解999可以被一步捕獲的棋子數(shù)

    這篇文章主要為大家介紹了go語(yǔ)言LeetCode題解999可以被一步捕獲的棋子數(shù)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-12-12
  • Golang 端口復(fù)用測(cè)試的實(shí)現(xiàn)

    Golang 端口復(fù)用測(cè)試的實(shí)現(xiàn)

    這篇文章主要介紹了Golang 端口復(fù)用測(cè)試的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • Golang實(shí)現(xiàn)組合模式和裝飾模式實(shí)例詳解

    Golang實(shí)現(xiàn)組合模式和裝飾模式實(shí)例詳解

    這篇文章主要介紹了Golang實(shí)現(xiàn)組合模式和裝飾模式,本文介紹組合模式和裝飾模式,golang實(shí)現(xiàn)兩種模式有共同之處,但在具體應(yīng)用場(chǎng)景有差異。通過(guò)對(duì)比兩個(gè)模式,可以加深理解,需要的朋友可以參考下
    2022-11-11
  • golang如何通過(guò)viper讀取config.yaml文件

    golang如何通過(guò)viper讀取config.yaml文件

    這篇文章主要介紹了golang通過(guò)viper讀取config.yaml文件,圍繞golang讀取config.yaml文件的相關(guān)資料展開詳細(xì)內(nèi)容,需要的小伙伴可以參考一下
    2022-03-03
  • 使用go自定義prometheus的exporter

    使用go自定義prometheus的exporter

    在prometheus中如果要監(jiān)控服務(wù)器和應(yīng)用的各種指標(biāo),需要用各種各樣的exporter服務(wù),這篇文章主要介紹了使用go自定義prometheus的exporter,需要的朋友可以參考下
    2023-03-03
  • 淺談Go用于同步和并發(fā)控制的幾種常見(jiàn)鎖

    淺談Go用于同步和并發(fā)控制的幾種常見(jiàn)鎖

    本文主要介紹了淺談Go用于同步和并發(fā)控制的幾種常見(jiàn)鎖,包括互斥鎖、讀寫鎖和一次性鎖等,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2024-08-08
  • Go語(yǔ)言中new()和 make()的區(qū)別詳解

    Go語(yǔ)言中new()和 make()的區(qū)別詳解

    這篇文章主要介紹了Go語(yǔ)言中new()和 make()的區(qū)別詳解,本文講解了new 的主要特性、make 的主要特性,并對(duì)它們的區(qū)別做了總結(jié),需要的朋友可以參考下
    2014-10-10
  • Go字符串切片操作str1[:index]的使用

    Go字符串切片操作str1[:index]的使用

    Go字符串切片str1[:index]從起始位置0到index-1截取,不復(fù)制數(shù)據(jù),利用字符串不可變性和共享內(nèi)存機(jī)制提升性能,具有一定的參考價(jià)值,感興趣的可以了解一下
    2025-06-06
  • Go實(shí)現(xiàn)mongodb增刪改查工具類的代碼示例

    Go實(shí)現(xiàn)mongodb增刪改查工具類的代碼示例

    這篇文章主要給大家介紹了關(guān)于Go實(shí)現(xiàn)mongodb增刪改查工具類的相關(guān)資料,MongoDB是一個(gè)NoSQL數(shù)據(jù)庫(kù),它提供了靈活的文檔存儲(chǔ)模型以及強(qiáng)大的查詢和操作功能,需要的朋友可以參考下
    2023-10-10

最新評(píng)論