亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

使用go實(shí)現(xiàn)一個(gè)超級(jí)mini的消息隊(duì)列的示例代碼

 更新時(shí)間:2021年12月14日 15:27:34   作者:壯士斷臂  
本文主要介紹了使用go實(shí)現(xiàn)一個(gè)超級(jí)mini的消息隊(duì)列的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下

前言

趁著有空余時(shí)間,就想著擼一個(gè)mini的生產(chǎn)-消費(fèi)消息隊(duì)列,說(shuō)干就干了。自己是個(gè)javer,這次實(shí)現(xiàn),特意換用了go。沒(méi)錯(cuò),是零基礎(chǔ)上手go,順便可以學(xué)學(xué)go。

前置知識(shí):

  1. go基本語(yǔ)法
  2. 消息隊(duì)列概念,也就三個(gè):生產(chǎn)者、消費(fèi)者、隊(duì)列

目的

  • 沒(méi)想著實(shí)現(xiàn)多復(fù)雜,因?yàn)闀r(shí)間有限,就mini就好,mini到什么程度呢
  • 使用雙向鏈表數(shù)據(jù)結(jié)構(gòu)作為隊(duì)列
  • 有多個(gè)topic可供生產(chǎn)者生成消息和消費(fèi)者消費(fèi)消息
  • 支持生產(chǎn)者并發(fā)寫
  • 支持消費(fèi)者讀,且ok后,從隊(duì)列刪除
  • 消息不丟失(持久化)
  • 高性能(先這樣想)

設(shè)計(jì)

整體架構(gòu)

協(xié)議

通訊協(xié)議底層使用tcp,mq是基于tcp自定義了一個(gè)協(xié)議,協(xié)議如下

type Msg struct {
   Id int64
   TopicLen int64
   Topic string
   // 1-consumer 2-producer 3-comsumer-ack 4-error
   MsgType int64 // 消息類型
   Len int64 // 消息長(zhǎng)度
   Payload []byte // 消息
}

Payload使用字節(jié)數(shù)組,是因?yàn)椴还軘?shù)據(jù)是什么,只當(dāng)做字節(jié)數(shù)組來(lái)處理即可。Msg承載著生產(chǎn)者生產(chǎn)的消息,消費(fèi)者消費(fèi)的消息,ACK、和錯(cuò)誤消息,前兩者會(huì)有負(fù)載,而后兩者負(fù)載和長(zhǎng)度都為空

協(xié)議的編解碼處理,就是對(duì)字節(jié)的處理,接下來(lái)有從字節(jié)轉(zhuǎn)為Msg,和從Msg轉(zhuǎn)為字節(jié)兩個(gè)函數(shù)

func BytesToMsg(reader io.Reader) Msg {

   m := Msg{}
   var buf [128]byte
   n, err := reader.Read(buf[:])
   if err != nil {
      fmt.Println("read failed, err:", err)
   }
   fmt.Println("read bytes:", n)
   // id
   buff := bytes.NewBuffer(buf[0:8])
   binary.Read(buff, binary.LittleEndian, &m.Id)
   // topiclen
   buff = bytes.NewBuffer(buf[8:16])
   binary.Read(buff, binary.LittleEndian, &m.TopicLen)
   // topic
   msgLastIndex := 16 + m.TopicLen
   m.Topic = string(buf[16: msgLastIndex])
   // msgtype
   buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 8])
   binary.Read(buff, binary.LittleEndian, &m.MsgType)

   buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 16])
   binary.Read(buff, binary.LittleEndian, &m.Len)

   if m.Len <= 0 {
      return m
   }

   m.Payload = buf[msgLastIndex + 16:]
   return m
}

func MsgToBytes(msg Msg) []byte {
   msg.TopicLen = int64(len([]byte(msg.Topic)))
   msg.Len = int64(len([]byte(msg.Payload)))

   var data []byte
   buf := bytes.NewBuffer([]byte{})
   binary.Write(buf, binary.LittleEndian, msg.Id)
   data = append(data, buf.Bytes()...)

   buf = bytes.NewBuffer([]byte{})
   binary.Write(buf, binary.LittleEndian, msg.TopicLen)
   data = append(data, buf.Bytes()...)
   
   data = append(data, []byte(msg.Topic)...)

   buf = bytes.NewBuffer([]byte{})
   binary.Write(buf, binary.LittleEndian, msg.MsgType)
   data = append(data, buf.Bytes()...)
   
   buf = bytes.NewBuffer([]byte{})
   binary.Write(buf, binary.LittleEndian, msg.Len)
   data = append(data, buf.Bytes()...)
   data = append(data, []byte(msg.Payload)...)

   return data
}

隊(duì)列

使用container/list,實(shí)現(xiàn)先入先出,生產(chǎn)者在隊(duì)尾寫,消費(fèi)者在隊(duì)頭讀取

package broker

import (
   "container/list"
   "sync"
)

type Queue struct {
   len int
   data list.List
}

var lock sync.Mutex

func (queue *Queue) offer(msg Msg) {
   queue.data.PushBack(msg)
   queue.len = queue.data.Len()
}

func (queue *Queue) poll() Msg{
   if queue.len == 0 {
      return Msg{}
   }
   msg := queue.data.Front()
   return msg.Value.(Msg)
}

func (queue *Queue) delete(id int64) {
   lock.Lock()
   for msg := queue.data.Front(); msg != nil; msg = msg.Next() {
      if msg.Value.(Msg).Id == id {
         queue.data.Remove(msg)
         queue.len = queue.data.Len()
         break
      }
   }
   lock.Unlock()
}

方法offer往隊(duì)列里插入數(shù)據(jù),poll從隊(duì)列頭讀取數(shù)據(jù)素,delete根據(jù)消息ID從隊(duì)列刪除數(shù)據(jù)。這里使用Queue結(jié)構(gòu)體對(duì)List進(jìn)行封裝,其實(shí)是有必要的,List作為底層的數(shù)據(jù)結(jié)構(gòu),我們希望隱藏更多的底層操作,只給客戶提供基本的操作
delete操作是在消費(fèi)者消費(fèi)成功且發(fā)送ACK后,對(duì)消息從隊(duì)列里移除的,因?yàn)橄M(fèi)者可以多個(gè)同時(shí)消費(fèi),所以這里進(jìn)入臨界區(qū)時(shí)加鎖(em,加鎖是否就一定會(huì)影響對(duì)性能有較大的影響呢)

broker

broker作為服務(wù)器角色,負(fù)責(zé)接收連接,接收和響應(yīng)請(qǐng)求

package broker

import (
   "bufio"
   "net"
   "os"
   "sync"
   "time"
)

var topics = sync.Map{}

func handleErr(conn net.Conn)  {
   defer func() {
      if err := recover(); err != nil {
         println(err.(string))
         conn.Write(MsgToBytes(Msg{MsgType: 4}))
      }
   }()
}

func Process(conn net.Conn) {
   handleErr(conn)
   reader := bufio.NewReader(conn)
   msg := BytesToMsg(reader)
   queue, ok := topics.Load(msg.Topic)
   var res Msg
   if msg.MsgType == 1 {
      // comsumer
      if queue == nil || queue.(*Queue).len == 0{
         return
      }
      msg = queue.(*Queue).poll()
      msg.MsgType = 1
      res = msg
   } else if msg.MsgType == 2 {
      // producer
      if ! ok {
         queue = &Queue{}
         queue.(*Queue).data.Init()
         topics.Store(msg.Topic, queue)
      }
      queue.(*Queue).offer(msg)
      res = Msg{Id: msg.Id, MsgType: 2}
   } else if msg.MsgType == 3 {
      // consumer ack
      if queue == nil {
         return
      }
      queue.(*Queue).delete(msg.Id)

   }
   conn.Write(MsgToBytes(res))

}

MsgType等于1時(shí),直接消費(fèi)消息;MsgType等于2時(shí)是生產(chǎn)者生產(chǎn)消息,如果隊(duì)列為空,那么還需創(chuàng)建一個(gè)新的隊(duì)列,放在對(duì)應(yīng)的topic下;MsgType等于3時(shí),代表消費(fèi)者成功消費(fèi),可以

刪除消息

我們說(shuō)消息不丟失,這里實(shí)現(xiàn)不完全,我就實(shí)現(xiàn)了持久化(持久化也沒(méi)全部實(shí)現(xiàn))。思路就是該topic對(duì)應(yīng)的隊(duì)列里的消息,按協(xié)議格式進(jìn)行序列化,當(dāng)broker啟動(dòng)時(shí),從文件恢復(fù)
持久化需要考慮的是增量還是全量,需要保存多久,這些都會(huì)影響實(shí)現(xiàn)的難度和性能(想想Kafka和Redis的持久化),這里表示簡(jiǎn)單實(shí)現(xiàn)就好:定時(shí)器定時(shí)保存

func Save()  {
   ticker := time.NewTicker(60)
   for {
      select {
      case <-ticker.C:
         topics.Range(func(key, value interface{}) bool {
            if value == nil {
               return false
            }
            file, _ := os.Open(key.(string))
            if file == nil {
               file, _ = os.Create(key.(string))
            }
            for msg := value.(*Queue).data.Front(); msg != nil; msg = msg.Next() {
               file.Write(MsgToBytes(msg.Value.(Msg)))
            }
            _ := file.Close()
            return false
         })
      default:
         time.Sleep(1)
      }
   }
}

有一個(gè)問(wèn)題是,當(dāng)上面的delete操作時(shí),這里的file文件需不需要跟著delete掉對(duì)應(yīng)的消息?答案是需要?jiǎng)h除的,如果不刪除,只能等下一次的全量持久化來(lái)覆蓋了,中間就有臟數(shù)據(jù)問(wèn)題
下面是啟動(dòng)邏輯

package main

import (
   "awesomeProject/broker"
   "fmt"
   "net"
)

func main()  {
   listen, err := net.Listen("tcp", "127.0.0.1:12345")
   if err != nil {
      fmt.Print("listen failed, err:", err)
      return
   }
   go broker.Save()
   for {
      conn, err := listen.Accept()
      if err != nil {
         fmt.Print("accept failed, err:", err)
         continue
      }
      go broker.Process(conn)

   }
}

生產(chǎn)者

package main

import (
   "awesomeProject/broker"
   "fmt"
   "net"
)

func produce() {
   conn, err := net.Dial("tcp", "127.0.0.1:12345")
   if err != nil {
      fmt.Print("connect failed, err:", err)
   }
   defer conn.Close()

   msg := broker.Msg{Id: 1102, Topic: "topic-test",  MsgType: 2,  Payload: []byte("我")}
   n, err := conn.Write(broker.MsgToBytes(msg))
   if err != nil {
      fmt.Print("write failed, err:", err)
   }

   fmt.Print(n)
}

消費(fèi)者

package main

import (
   "awesomeProject/broker"
   "bytes"
   "fmt"
   "net"
)

func comsume() {
   conn, err := net.Dial("tcp", "127.0.0.1:12345")
   if err != nil {
      fmt.Print("connect failed, err:", err)
   }
   defer conn.Close()

   msg := broker.Msg{Topic: "topic-test",  MsgType: 1}

   n, err := conn.Write(broker.MsgToBytes(msg))
   if err != nil {
      fmt.Println("write failed, err:", err)
   }
   fmt.Println("n", n)

   var res [128]byte
   conn.Read(res[:])
   buf := bytes.NewBuffer(res[:])
   receMsg := broker.BytesToMsg(buf)
   fmt.Print(receMsg)

   // ack
   conn, _ = net.Dial("tcp", "127.0.0.1:12345")
   l, e := conn.Write(broker.MsgToBytes(broker.Msg{Id: receMsg.Id, Topic: receMsg.Topic, MsgType: 3}))
   if e != nil {
      fmt.Println("write failed, err:", err)
   }
   fmt.Println("l:", l)
}

消費(fèi)者這里ack時(shí)重新創(chuàng)建了連接,如果不創(chuàng)建連接的話,那服務(wù)端那里就需要一直從conn讀取數(shù)據(jù),直到結(jié)束。思考一下,像RabbitMQ的ack就有自動(dòng)和手工的ack,如果是手工的ack,必然需要一個(gè)新的連接,因?yàn)椴恢揽蛻舳耸裁磿r(shí)候發(fā)送ack,自動(dòng)的話,當(dāng)然可以使用同一個(gè)連接,but這里就簡(jiǎn)單創(chuàng)建一條新連接吧

啟動(dòng)

先啟動(dòng)broker,再啟動(dòng)producer,然后啟動(dòng)comsumer,OK,能跑,能實(shí)現(xiàn)發(fā)送消息到隊(duì)列,從隊(duì)列消費(fèi)消息

總結(jié)

整體雖然簡(jiǎn)單,但畢竟是使用go實(shí)現(xiàn)的,就是看似一頓操作猛如虎,實(shí)質(zhì)慌如狗。第一時(shí)間就被go的gopath和go mod困擾住,后面語(yǔ)法的使用,比如指針,傳值傳引用等,最頭疼的就是類型轉(zhuǎn)換,作為一個(gè)javer,使用go進(jìn)行類型轉(zhuǎn)換,著實(shí)被狠狠得虐了一番。

到此這篇關(guān)于使用go實(shí)現(xiàn)一個(gè)超級(jí)mini的消息隊(duì)列的示例代碼的文章就介紹到這了,更多相關(guān)go mini消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!?

相關(guān)文章

  • Go語(yǔ)言中反射的正確使用

    Go語(yǔ)言中反射的正確使用

    Go本身不支持模板,因此在以往需要使用模板的場(chǎng)景下往往就需要使用反射(reflect). 反射使用多了以后會(huì)容易上癮,有些人甚至?xí)纬梢环N莫名其妙的鄙視鏈。下面這篇文章就給大家介紹了如何正確使用Go語(yǔ)言中的反射以及在使用前的注意,有需要的朋友們下面來(lái)一起看看吧。
    2016-12-12
  • GOPROXY:解決go get golang.org/x包失敗問(wèn)題

    GOPROXY:解決go get golang.org/x包失敗問(wèn)題

    這篇文章主要介紹了GOPROXY:解決go get golang.org/x包失敗問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-01-01
  • Go語(yǔ)言并發(fā)技術(shù)詳解

    Go語(yǔ)言并發(fā)技術(shù)詳解

    這篇文章主要介紹了Go語(yǔ)言并發(fā)技術(shù)詳解,本文講解了goroutine、channels、Buffered Channels、Range和Close等內(nèi)容,需要的朋友可以參考下
    2014-10-10
  • goland服務(wù)熱重啟的配置文件

    goland服務(wù)熱重啟的配置文件

    這篇文章主要介紹了goland服務(wù)熱重啟的配置文件,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-12-12
  • 詳解Go語(yǔ)言中調(diào)度器的原理與使用

    詳解Go語(yǔ)言中調(diào)度器的原理與使用

    這篇文章主要介紹了Go語(yǔ)言運(yùn)行時(shí)調(diào)度器的實(shí)現(xiàn)原理,其中包含調(diào)度器的設(shè)計(jì)與實(shí)現(xiàn)原理、演變過(guò)程以及與運(yùn)行時(shí)調(diào)度相關(guān)的數(shù)據(jù)結(jié)構(gòu),希望對(duì)大家有所幫助
    2023-07-07
  • 一文帶你揭秘Go中new()和make()函數(shù)的區(qū)別和用途

    一文帶你揭秘Go中new()和make()函數(shù)的區(qū)別和用途

    Go(或 Golang)是一種現(xiàn)代、靜態(tài)類型、編譯型的編程語(yǔ)言,專為構(gòu)建可擴(kuò)展、并發(fā)和高效的軟件而設(shè)計(jì),它提供了各種內(nèi)置的函數(shù)和特性,幫助開(kāi)發(fā)人員編寫簡(jiǎn)潔高效的代碼,在本博客文章中,我們將探討 new() 和 make() 函數(shù)之間的區(qū)別,了解何時(shí)以及如何有效地使用它們
    2023-10-10
  • golang微服務(wù)框架kratos實(shí)現(xiàn)Socket.IO服務(wù)的方法

    golang微服務(wù)框架kratos實(shí)現(xiàn)Socket.IO服務(wù)的方法

    本文主要介紹了golang微服務(wù)框架kratos實(shí)現(xiàn)Socket.IO服務(wù)的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-06-06
  • Golang控制通道實(shí)現(xiàn)協(xié)程等待詳解

    Golang控制通道實(shí)現(xiàn)協(xié)程等待詳解

    這篇文章主要介紹了Golang控制通道實(shí)現(xiàn)協(xié)程等待,通道是Go語(yǔ)言程序的并發(fā)體goroutine是它們之間的通信機(jī)制。一個(gè)通道是一個(gè)通信機(jī)制,它可以讓一個(gè)goroutine通過(guò)它給另一個(gè)goroutine發(fā)送值信息。每個(gè)通道都有一個(gè)特殊的類型,也就是channels可發(fā)送數(shù)據(jù)的類型
    2022-11-11
  • Go?1.21.0?新增結(jié)構(gòu)化日志記錄標(biāo)準(zhǔn)庫(kù)log/slog使用詳解

    Go?1.21.0?新增結(jié)構(gòu)化日志記錄標(biāo)準(zhǔn)庫(kù)log/slog使用詳解

    這篇文章主要為大家介紹了Go?1.21.0?新增結(jié)構(gòu)化日志記錄標(biāo)準(zhǔn)庫(kù)log/slog使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-11-11
  • golang連接sqlx庫(kù)的操作使用指南

    golang連接sqlx庫(kù)的操作使用指南

    這篇文章主要為大家介紹了golang連接sqlx庫(kù)的操作使用指南,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪
    2022-04-04

最新評(píng)論