亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

使用Go基于WebSocket構(gòu)建千萬(wàn)級(jí)視頻直播彈幕系統(tǒng)的代碼詳解

 更新時(shí)間:2022年04月28日 17:04:26   作者:BlueMiaomiao  
這篇文章主要介紹了使用Go基于WebSocket構(gòu)建千萬(wàn)級(jí)視頻直播彈幕系統(tǒng),本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

(1)業(yè)務(wù)復(fù)雜度介紹

開門見山,假設(shè)一個(gè)直播間同時(shí)500W人在線,那么1秒鐘1000條彈幕,那么彈幕系統(tǒng)的推送頻率就是: 500W * 1000條/秒=50億條/秒 ,想想B站2019跨年晚會(huì)那次彈幕系統(tǒng)得是多么的NB,況且一個(gè)大型網(wǎng)站不可能只有一個(gè)直播間!

使用Go做WebSocket開發(fā)無非就是三種情況:

  • 使用Go原生自帶的庫(kù),也就是 golang.org/x/net ,但是這個(gè)官方庫(kù)真是出了奇Bug多
  • 使用GitHub大佬 gorilla/websocket 庫(kù),可以結(jié)合到某些Web開發(fā)框架,比如Gin、iris等,只要使用的框架式基于 golang.org/net 的,那么這個(gè)庫(kù)就可以與這個(gè)框架結(jié)合
  • 手?jǐn)]一個(gè)WebSocket框架

根據(jù)估算結(jié)果,彈幕推送量很大的時(shí)候,Linux內(nèi)核將會(huì)出現(xiàn)瓶頸,因?yàn)長(zhǎng)inux內(nèi)核發(fā)送TCP包的時(shí)候極限包發(fā)送頻率是100W。因此可以將同一秒內(nèi)的彈幕消息合并為1條推送,減少網(wǎng)絡(luò)小數(shù)據(jù)包的發(fā)送,從而降低推送頻率。

彈幕系統(tǒng)需要維護(hù)在線的用戶長(zhǎng)連接來實(shí)現(xiàn)定向推送到在線的用戶,通常是使用Hash字典結(jié)構(gòu),通常推送消息就是遍歷在線用的Hash字典。在彈幕推送期間用戶在不斷的上下線,為了維護(hù)上線用戶,那么就得不斷的修改Hash字典,不斷地進(jìn)行鎖操作,用戶量過大導(dǎo)致鎖瓶頸。因此可以將整個(gè)Hash結(jié)構(gòu)拆分為多個(gè)Hash結(jié)構(gòu),分別對(duì)多個(gè)Hash結(jié)構(gòu)加不同的鎖,并且使用讀寫鎖替代互斥鎖。

通常服務(wù)器與客戶端交互使用JSON結(jié)構(gòu),那么需要不斷的編碼解碼JSON數(shù)據(jù),這將會(huì)導(dǎo)致CPU瓶頸。將消息先進(jìn)行合并,然后進(jìn)行編碼,最后輪詢Hash結(jié)構(gòu)進(jìn)行推送。

以上是單體架構(gòu)存在的問題,為了支持更多的用戶負(fù)載,通常彈幕系統(tǒng)采用分布式架構(gòu),進(jìn)行彈性擴(kuò)容縮容。

(2)推送還是拉???

如果是客戶端拉取服務(wù)器端數(shù)據(jù),那么將會(huì)存在以下幾個(gè)問題:

  • 直播在線人數(shù)多就意味著消息數(shù)據(jù)更新頻率高,拉取消息意味著彈幕無法滿足時(shí)效性
  • 如果很多客戶端同時(shí)拉取,那么服務(wù)器端的壓力無異于DDOS
  • 一個(gè)彈幕系統(tǒng)應(yīng)該是通用的,因此對(duì)于直播間彈幕較少的場(chǎng)景,意味著消息數(shù)據(jù)拉取請(qǐng)求都是無效的

因此我們考慮推送模式:當(dāng)數(shù)據(jù)發(fā)生更新的時(shí)候服務(wù)器端主動(dòng)推送到客戶端,這樣可以有效減少客戶端的請(qǐng)求次數(shù)。如果需要實(shí)現(xiàn)消息推送,那么就意味著服務(wù)器端維護(hù)大量的長(zhǎng)連接。

(3)為什么使用WebSocket?

實(shí)現(xiàn)彈幕消息的實(shí)時(shí)更新一定是使用Socket的方式,那么為啥要使用WebSocket呢?現(xiàn)在大部分直播應(yīng)用的開發(fā)都是跨平臺(tái)的,然而跨平臺(tái)的開發(fā)框架本質(zhì)就是Web開發(fā),那么一定離不開WebSocket,而且一部分用戶會(huì)選擇在Web端看視頻,比如Bilibili,現(xiàn)如今也有一些桌面應(yīng)用是用Electron等跨平臺(tái)框架開發(fā)的,比如Lark飛書等,因此實(shí)現(xiàn)消息推送的最佳方案就是使用WebSocket。

使用WebSocket可以輕松的維持服務(wù)器端長(zhǎng)連接,其次WebSocket是架構(gòu)在HTTP協(xié)議之上的,并且也可以使用HTTPS方式,因此WebSocket是可靠傳輸,并且不需要開發(fā)者關(guān)注底層細(xì)節(jié)。

為啥要使用Go搞WebSocket呢?首先說到WebSocket你可能會(huì)想到Node.js,但是Node.js是單線程模型,如果實(shí)現(xiàn)高并發(fā),不得不創(chuàng)建多個(gè)Node.js進(jìn)程,但是這又不容易服務(wù)端遍歷整個(gè)連接集合;如果使用Java就會(huì)顯得比較笨重,Java項(xiàng)目的部署,編寫Dockerfile都不如Go的目標(biāo)二進(jìn)制更加簡(jiǎn)潔,并且Go協(xié)程很容易實(shí)現(xiàn)高并發(fā),上一章說到Go語(yǔ)言目前也有成熟的WebSocket輪子。

(4)服務(wù)端基本Demo

首先搭建好一個(gè)框架:

package main
import (
  "fmt"
  "net/http"
)
func main() {
 fmt.Println("Listen localhost:8080")
   // 注冊(cè)一個(gè)用于WebSocket的路由,實(shí)際業(yè)務(wù)中不可能只有一個(gè)路由
  http.HandleFunc("/messages", messageHandler)
  // 監(jiān)聽8080端口,沒有實(shí)現(xiàn)服務(wù)異常處理器,因此第二個(gè)參數(shù)是nil
  http.ListenAndServe("localhost:8080", nil)
}
func messageHandler(response http.ResponseWriter, request *http.Request) {
  // TODO: 實(shí)現(xiàn)消息處理
  response.Write([]byte("HelloWorld"))
}

然后完善messageHandler函數(shù):

func messageHandler(response http.ResponseWriter, request *http.Request) {
  var upgrader = websocket.Upgrader{
    // 允許跨域
    CheckOrigin: func(resquest *http.Request) bool {
      return true
    },
  }
  // 建立連接
  conn, err := upgrader.Upgrade(response, request, nil)
  if err != nil {
    return
  }
  // 收發(fā)消息
  for {
    // 讀取消息
    _, bytes, err := conn.ReadMessage()
    if err != nil {
      _ = conn.Close()
    }
    // 寫入消息
    err = conn.WriteMessage(websocket.TextMessage, bytes)
    if err != nil {
      _ = conn.Close()
    }
  }
}

現(xiàn)在基本上實(shí)現(xiàn)了WebSocket功能,但是websocket的原生API不是線程安全的(Close方法是線程安全的,并且是可重入的),并且其他模塊無法復(fù)用業(yè)務(wù)邏輯,因此進(jìn)行封裝:

  • 封裝Connection對(duì)象描述一個(gè)WebSocket連接
  • 為Connection對(duì)象提供線程安全的關(guān)閉、接收、發(fā)送API
// main.go
package main
import (
  "bluemiaomiao.github.io/websocket-go/service"
  "fmt"
  "net/http"
  "github.com/gorilla/websocket"
)
func main() {
  fmt.Println("Listen localhost:8080")
  http.HandleFunc("/messages", messageHandler)
  _ = http.ListenAndServe("localhost:8080", nil)
}
func messageHandler(response http.ResponseWriter, request *http.Request) {
  var upgrader = websocket.Upgrader{
    // 允許跨域
    CheckOrigin: func(resquest *http.Request) bool {
      return true
    },
  }
  // 建立連接
  conn, err := upgrader.Upgrade(response, request, nil)
  wsConn, err := service.Create(conn)
  if err != nil {
    return
  }
  // 收發(fā)消息
  for {
    // 讀取消息
    msg, err := wsConn.ReadOne()
    if err != nil {
      wsConn.Close()
    }
    // 寫入消息
    err = wsConn.WriteOne(msg)
    if err != nil {
      _ = conn.Close()
    }
  }
}
// service/messsage_service.go
package service
import (
  "errors"
  "github.com/gorilla/websocket"
  "sync"
)
// 封裝的連接對(duì)象
// 
// 由于websocket的Close()方法是可重入的,所以可以多次調(diào)用,但是關(guān)閉Channel的close()
// 方法不是可重入的,因此通過isClosed進(jìn)行判斷
// isClosed可能發(fā)生資源競(jìng)爭(zhēng),因此通過互斥鎖避免
// 關(guān)閉websocket連接后,也要自動(dòng)關(guān)閉輸入輸出消息流,因此通過signalCloseLoopChan實(shí)現(xiàn)
type Connection struct {
  conn                  *websocket.Conn  // 具體的連接對(duì)象
  inputStream             chan []byte       // 輸入流,使用Channel模擬
  outputStream           chan []byte       // 輸出流,使用chaneel模擬
  signalCloseLoopChan     chan byte       // 關(guān)閉信號(hào)
  isClosed               bool            // 是否調(diào)用過close()方法
  lock                   sync.Mutex      // 簡(jiǎn)單的鎖
}
// 用于初始化一個(gè)連接對(duì)象
func Create(conn *websocket.Conn) (connection *Connection, err error) {
  connection = &Connection{
    conn:              conn,
    inputStream:        make(chan []byte, 1000),
    outputStream:       make(chan []byte, 1000),
    signalCloseLoopChan: make(chan byte, 1),
    isClosed:            false,
  }
  // 啟動(dòng)讀寫循環(huán)
  go connection.readLoop()
  go connection.writeLoop()
  return
}
// 讀取一條消息
func (c *Connection) ReadOne() (msg []byte, err error) {
  select {
  case msg = <-(*c).inputStream:
  case <-(*c).signalCloseLoopChan:
    err = errors.New("connection is closed")
  }
  return
}
// 寫入一條消息
func (c *Connection) WriteOne(msg []byte) (err error) {
  select {
  case (*c).outputStream <- msg:
  case <-(*c).signalCloseLoopChan:
    err = errors.New("connection is closed")
  }
  return
}
// 關(guān)閉連接對(duì)象
func (c *Connection) Close() {
  _ = (*c).conn.Close()
  (*c).lock.Lock()
  if !(*c).isClosed {
    close((*c).signalCloseLoopChan)
  }
  (*c).lock.Unlock()
}
// 讀取循環(huán)
func (c *Connection) readLoop() {
  // 不停的讀取長(zhǎng)連接中的消息,只要存在消息就將其放到隊(duì)列中
  for {
    _, bytes, err := (*c).conn.ReadMessage()
    if err != nil {
      (*c).Close()
    }
    select {
    case <-(*c).signalCloseLoopChan:
      (*c).Close()
    case (*c).inputStream <- bytes:
    }
  }
}
// 寫入循環(huán)
func (c *Connection) writeLoop() {
  // 只要隊(duì)列中存在消息,就將其寫入
  var data []byte
  for {
    select {
    case data = <-(*c).outputStream:
    case <-(*c).signalCloseLoopChan:
      (*c).Close()
    }
    err := (*c).conn.WriteMessage(websocket.TextMessage, data)
    if err != nil {
      _ = (*c).conn.Close()
    }
  }
}

至此,你已經(jīng)學(xué)會(huì)了如何使用Go構(gòu)建WebSocket服務(wù)。

到此這篇關(guān)于使用Go基于WebSocket構(gòu)建千萬(wàn)級(jí)視頻直播彈幕系統(tǒng)的代碼詳解的文章就介紹到這了,更多相關(guān)go WebSocket視頻直播彈幕內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • golang使用swagger的過程詳解

    golang使用swagger的過程詳解

    這篇文章主要介紹了golang使用swagger的過程詳解,本文給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧
    2024-06-06
  • Golang中錯(cuò)誤處理機(jī)制詳解

    Golang中錯(cuò)誤處理機(jī)制詳解

    平時(shí)在項(xiàng)目開發(fā)過程中少不了對(duì)錯(cuò)誤的處理,一個(gè)好用的系統(tǒng)首先要確保其健壯性,不能經(jīng)常發(fā)生錯(cuò)誤就卡死之類的情況,為了讓我們的程序更加健壯,我們就需要知道golang里的錯(cuò)誤處理機(jī)制是怎么樣的,這篇文章帶大家一起學(xué)習(xí),需要的朋友跟著小編一起來看看吧
    2024-05-05
  • Go語(yǔ)言實(shí)現(xiàn)二進(jìn)制與十進(jìn)制互轉(zhuǎn)的示例代碼

    Go語(yǔ)言實(shí)現(xiàn)二進(jìn)制與十進(jìn)制互轉(zhuǎn)的示例代碼

    這篇文章主要和大家詳細(xì)介紹了Go語(yǔ)言中實(shí)現(xiàn)二進(jìn)制與十進(jìn)制互相轉(zhuǎn)換的示例代碼,文中的代碼簡(jiǎn)潔易懂,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2023-05-05
  • Go開發(fā)中有哪幾種無法恢復(fù)的致命場(chǎng)景分析

    Go開發(fā)中有哪幾種無法恢復(fù)的致命場(chǎng)景分析

    這篇文章主要為大家介紹了Go有哪幾種無法恢復(fù)的致命場(chǎng)景示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-08-08
  • Golang使用gofumpt進(jìn)行代碼格式化

    Golang使用gofumpt進(jìn)行代碼格式化

    這篇文章主要為大家詳細(xì)介紹了Golang如何使用gofumpt進(jìn)行代碼格式化,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2024-02-02
  • go?micro微服務(wù)proto開發(fā)安裝及使用規(guī)則

    go?micro微服務(wù)proto開發(fā)安裝及使用規(guī)則

    這篇文章主要為大家介紹了go?micro微服務(wù)proto開發(fā)中安裝Protobuf及基本規(guī)范字段的規(guī)則詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-01-01
  • Go 常量基礎(chǔ)概念(聲明更改只讀)

    Go 常量基礎(chǔ)概念(聲明更改只讀)

    這篇文章主要為大家介紹了Go常量基礎(chǔ)概念包括常量的聲明更改只讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-08-08
  • 在Go中構(gòu)建并發(fā)TCP服務(wù)器

    在Go中構(gòu)建并發(fā)TCP服務(wù)器

    今天小編就為大家分享一篇關(guān)于在Go中構(gòu)建并發(fā)TCP服務(wù)器的文章,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧
    2018-10-10
  • 簡(jiǎn)單對(duì)比一下?C語(yǔ)言?與?Go語(yǔ)言

    簡(jiǎn)單對(duì)比一下?C語(yǔ)言?與?Go語(yǔ)言

    這篇文章主要介紹了簡(jiǎn)單對(duì)比一下?C語(yǔ)言?與?Go語(yǔ)言的相關(guān)資料,需要的朋友可以參考下
    2023-08-08
  • Go編寫定時(shí)器與定時(shí)任務(wù)詳解(附第三方庫(kù)gocron用法)

    Go編寫定時(shí)器與定時(shí)任務(wù)詳解(附第三方庫(kù)gocron用法)

    當(dāng)需要每天執(zhí)行定時(shí)任務(wù)的時(shí)候就需要定時(shí)器來處理了,周期任務(wù),倒計(jì)時(shí)任務(wù),定點(diǎn)任務(wù)等,下面這篇文章主要給大家介紹了關(guān)于Go編寫定時(shí)器與定時(shí)任務(wù)的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-07-07

最新評(píng)論