亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

詳解如何用Golang處理每分鐘100萬個請求

 更新時間:2023年04月11日 11:40:36   作者:janrs_com  
在項目開發(fā)中,我們常常會遇到處理來自數(shù)百萬個端點的大量POST請求,本文主要介紹了Golang實現(xiàn)處理每分鐘100萬個請求的方法,希望對大家有所幫助

面臨的問題

在我設計一個分析系統(tǒng)中,我們公司的目標是能夠處理來自數(shù)百萬個端點的大量POST請求。web 網(wǎng)絡處理程序?qū)⑹盏揭粋€JSON文檔,其中可能包含許多有效載荷的集合,需要寫入Amazon S3,以便我們的地圖還原系統(tǒng)隨后對這些數(shù)據(jù)進行操作。

傳統(tǒng)上,我們會研究創(chuàng)建一個工人層架構(gòu),利用諸如以下東西:

  • Sidekiq
  • Resque
  • DelayedJob
  • Elasticbeanstalk Worker Tier
  • RabbitMQ
  • 還有等等其他的技術(shù)手段...

并設置 2 個不同的集群,一個用于 Web 前端,另一個用于 worker 處理進程,這樣我們就可以擴大我們可以處理的后臺工作量。

但從一開始,我們的團隊就知道我們應該在 Go 中這樣做,因為在討論階段我們看到這可能是一個非常大的流量系統(tǒng)。 我使用 Go 已有大約 2 年左右的時間,我們公司在處理業(yè)務時開發(fā)了一些系統(tǒng),但沒有一個能承受如此大的負載。以下是優(yōu)化的過程。

我們首先創(chuàng)建一些結(jié)構(gòu)體來定義我們將通過 POST 調(diào)用接收的 Web 請求負載,以及一種將其上傳到我們的 S3 存儲桶的方法。代碼如下:

type PayloadCollection struct {
    WindowsVersion  string    `json:"version"`
    Token           string    `json:"token"`
    Payloads        []Payload `json:"data"`
}

type Payload struct {
    // ...負載字段
}

func (p *Payload) UploadToS3() error {
    // storageFolder 方法確保在我們在鍵名中獲得相同時間戳時不會發(fā)生名稱沖突
    storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

    bucket := S3Bucket

    b := new(bytes.Buffer)
    encodeErr := json.NewEncoder(b).Encode(payload)
    if encodeErr != nil {
        return encodeErr
    }

    // 我們發(fā)布到 S3 存儲桶的所有內(nèi)容都應標記為“私有”
    var acl = s3.Private
    var contentType = "application/octet-stream"

    return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

使用 Go 協(xié)程

最初我們采用了一個非常簡單的 POST 處理程序?qū)崿F(xiàn),只是試圖將job 處理程序并行化到一個簡單的 goroutine 中:

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // 將body讀入字符串進行json解碼
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }
    
    // 分別檢查每個有效負載和隊列項目以發(fā)布到 S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- 這是不建議的做法。這里是最開始的做法。
    }

    w.WriteHeader(http.StatusOK)
}

對于中等負載,這可能適用于大多數(shù)公司的流量,但很快證明這在大規(guī)模情況下效果不佳。 我們期望有很多請求,但沒有達到我們將第一個版本部署到生產(chǎn)環(huán)境時開始看到的數(shù)量級。 我們完全低估了流量。

上面的方法在幾個不同的方面是不好的。 無法控制我們生成了多少個 go routines。 由于我們每分鐘收到 100 萬個 POST 請求,因此這段代碼很快崩潰了。

進一步優(yōu)化

我們需要找到一種不同的方式。 從一開始我們就開始討論我們需要如何保持請求處理程序的生命周期非常短,并在后臺進行生成處理。 當然,這是你在使用 Ruby on Rails 時必須做的,否則你將阻止所有可用的 worker web 處理器,無論你使用的是 puma、unicorn 還是 passenger(請不要進入 JRuby 討論)。 然后我們需要利用常見的解決方案來做到這一點,例如 Resque、Sidekiq、SQS 等等,有很多方法可以實現(xiàn)這一點。

所以第二次迭代是創(chuàng)建一個緩沖通道,我們可以創(chuàng)建一些隊列,然后把 job push到隊列并將它們上傳到 S3,并且由于我們可以控制job 隊列中的最大數(shù)數(shù)量并且我們有足夠的內(nèi)存來處理隊列中的 job。在這個方案中,我們認為只需要在通道隊列中緩沖需要處理的 job 就可以了。

代碼如下:

var Queue chan Payload

func init() {
    Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
    ...
    // 分別檢查每個有效負載和隊列項目以發(fā)布到 S3
    for _, payload := range content.Payloads {
        Queue <- payload // <----- 這是建議的做法。
    }
    ...
}

然后為了實際出列作業(yè)并處理它們,我們使用了類似的東西:

func StartProcessor() {
    for {
        select {
        case job := <-Queue:
            job.payload.UploadToS3()  // <-- 這里雖然優(yōu)化了,但還不是最好的。
        }
    }
}

在上面的代碼中,我們用一個緩沖隊列來交換有缺陷的并發(fā)性,而緩沖隊列只是推遲了問題。 我們的同步處理器一次只將一個有效負載上傳到 S3,并且由于傳入請求的速率遠遠大于單個處理器上傳到 S3 的能力,我們的 job 緩沖通道很快達到了極限并阻止了請求處理程序的能力,隊列很快就阻塞滿了。

我們只是在避免這個問題,并開始倒計時,直到我們的系統(tǒng)最終死亡。 在我們部署這個有缺陷的版本后,我們的延遲率在幾分鐘內(nèi)以恒定的速度持續(xù)增加。以下是延遲率增長圖:

更好的解決方案

我們決定在使用 Go 通道時使用一種通用模式,以創(chuàng)建一個 2 層通道系統(tǒng),一個用于 Job 隊列,另一個用于控制同時在 Job 隊列上操作的 Worker 的數(shù)量。

這個想法是將上傳到 S3 的數(shù)據(jù)并行化到某種程度上可持續(xù)的速度,這種速度既不會削弱機器也不會開始從 S3 生成連接錯誤。 所以我們選擇創(chuàng)建 Job/Worker 模式。 對于那些熟悉 Java、C# 等的人來說,可以將其視為 Golang 使用通道實現(xiàn) Worker 線程池的方式。

代碼如下:

var (
    MaxWorker = os.Getenv("MAX_WORKERS")
    MaxQueue  = os.Getenv("MAX_QUEUE")
)

// Job 表示要運行的作業(yè)
type Job struct {
    Payload Payload
}

// 我們可以在 Job 隊列上發(fā)送工作請求的緩沖通道。
var JobQueue chan Job

// Worker 代表執(zhí)行作業(yè)的 Worker。
type Worker struct {
    WorkerPool  chan chan Job
    JobChannel  chan Job
    quit        chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool)}
}

// Start 方法為 Worker 啟動循環(huán)監(jiān)聽。監(jiān)聽退出信號以防我們需要停止它。
func (w Worker) Start() {
    go func() {
        for {
            // 將當前 woker 注冊到工作隊列中。
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // 接收 work 請求。
                if err := job.Payload.UploadToS3(); err != nil {
                    log.Errorf("Error uploading to S3: %s", err.Error())
                }

            case <-w.quit:
                // 接收一個退出的信號。
                return
            }
        }
    }()
}

// 將退出信號傳遞給 Worker 進程以停止處理清理。
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

我們已經(jīng)修改了我們的 Web 請求處理程序,以創(chuàng)建一個帶有有效負載的 Job 結(jié)構(gòu)實例,并將其發(fā)送到 JobQueue 通道以供 Worker 提取。

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // 將body讀入字符串進行json解碼
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // 分別檢查每個有效負載和隊列項目以發(fā)布到 S3
    for _, payload := range content.Payloads {

        // 創(chuàng)建一個有效負載的job
        work := Job{Payload: payload}

        // 將 work push 到隊列。
        JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}

在我們的 Web 服務器初始化期間,我們創(chuàng)建一個 Dispatcher 調(diào)度器并調(diào)用 Run() 來創(chuàng)建 Woker 工作池并開始偵聽將出現(xiàn)在 Job 隊列中的 Job。

dispatcher := NewDispatcher(MaxWorker) 
dispatcher.Run()

下面是我們的調(diào)度程序?qū)崿F(xiàn)的代碼:

type Dispatcher struct {
    // 通過調(diào)度器注冊一個 Worker 通道池
    WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
    // 啟動指定數(shù)量的 Worker
    for i := 0; i < d.maxWorkers; i++ {
        worker := NewWorker(d.pool)
        worker.Start()
    }

    go d.dispatch()
}

func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            // 接收一個 job 請求
            go func(job Job) {
                // 嘗試獲取可用的 worker job 通道
                // 這將阻塞 worker 直到空閑
                jobChannel := <-d.WorkerPool

                // 調(diào)度一個 job 到 worker job 通道
                jobChannel <- job
            }(job)
        }
    }
}

請注意,我們提供了要實例化并添加到我們的 Worker 池中的最大worker 數(shù)量。 由于我們在這個項目中使用了 Amazon Elasticbeanstalk 和 dockerized Go 環(huán)境,因此我們從環(huán)境變量中讀取這些值。 這樣我們就可以控制 Job 隊列的數(shù)量和最大大小,因此我們可以快速調(diào)整這些值而無需重新部署集群。

var ( 
  MaxWorker = os.Getenv("MAX_WORKERS")
  MaxQueue  = os.Getenv("MAX_QUEUE")
)

在我們部署它之后,我們立即看到我們所有的延遲率都下降到極低的延遲,并且我們處理請求的能力急劇上升。以下是流量截圖:

在我們的彈性負載均衡器完全預熱幾分鐘后,我們看到我們的 ElasticBeanstalk 應用程序每分鐘處理近 100 萬個請求。 我們通常在早上有幾個小時的流量會飆升至每分鐘超過一百萬。

一旦我們部署了新代碼,服務器數(shù)量就從 100 臺服務器大幅下降到大約 20 臺服務器。以下是服務器數(shù)量變化截圖:

在正確配置集群和自動縮放設置后,我們能夠?qū)⑵溥M一步降低到僅 4x EC2 c4.Large 實例,并且如果 CPU 使用率超過 90% 持續(xù) 5 天,Elastic Auto-Scaling 將生成一個新實例 分鐘值。以下是截圖:

總結(jié)

可以看出利用 Elasticbeanstalk 自動縮放的強大功能以及 Golang 提供的開箱即用的高效和簡單的并發(fā)方法,就可以構(gòu)建出一個高性能的處理程序。

以上就是詳解如何用Golang處理每分鐘100萬個請求的詳細內(nèi)容,更多關(guān)于Golang處理請求的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Golang實現(xiàn)Java虛擬機之解析class文件詳解

    Golang實現(xiàn)Java虛擬機之解析class文件詳解

    這篇文章主要為大家詳細介紹了Golang實現(xiàn)Java虛擬機之解析class文件的相關(guān)知識,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學習一下
    2024-01-01
  • Go 常量基礎概念(聲明更改只讀)

    Go 常量基礎概念(聲明更改只讀)

    這篇文章主要為大家介紹了Go常量基礎概念包括常量的聲明更改只讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-08-08
  • 超全講解Golang中defer關(guān)鍵字的用法

    超全講解Golang中defer關(guān)鍵字的用法

    本文將從一個資源回收問題引入,引出defer關(guān)鍵字,并對其進行基本介紹,從而讓大家對Go語言中的defer有更深入的了解,需要的小伙伴可以學習一下
    2023-05-05
  • Go語言如何實現(xiàn)TCP通信詳解

    Go語言如何實現(xiàn)TCP通信詳解

    go里面實現(xiàn)tcp沒有像之前寫的C++那些那么麻煩,在C++里面要先創(chuàng)建套接字,然后綁定ip地址,go里面直接就一個函數(shù)建立套接字,然后在進行通信就可以了,下面這篇文章主要給大家介紹了關(guān)于Go語言如何實現(xiàn)TCP通信的相關(guān)資料,需要的朋友可以參考下
    2023-01-01
  • 利用Golang解析json數(shù)據(jù)的方法示例

    利用Golang解析json數(shù)據(jù)的方法示例

    Go提供了原生的JSON庫,并且與語言本身有效的集成在了一起。下面這篇文章將給大家介紹關(guān)于利用Golang解析json數(shù)據(jù)的方法,文中給出了詳細的示例代碼供大家參考學習,需要的朋友們下面跟著小編來一起學習學習吧。
    2017-07-07
  • 一文帶你了解Go語言中time包的時間常用操作

    一文帶你了解Go語言中time包的時間常用操作

    在日常開發(fā)中,我們避免不了時間的使用,我們可能需要獲取當前時間,然后格式化保存,也可能需要在時間類型與字符串類型之間相互轉(zhuǎn)換等。本文將會對?Go?time?包里面的常用函數(shù)和方法進行介紹,需要的可以參考一下
    2022-12-12
  • 一文帶你深入了解Go語言中的事務

    一文帶你深入了解Go語言中的事務

    事務中止時,你結(jié)束事務了嗎?在開發(fā)時有可能就會犯這樣的錯誤,其問題就是你在提交事務時,如果中間有其他業(yè)務就取消操作,那么事務也關(guān)閉了嗎?本文就來詳細講講
    2023-04-04
  • go語言實現(xiàn)十大常見的排序算法示例

    go語言實現(xiàn)十大常見的排序算法示例

    這篇文章主要為大家介紹了go語言實現(xiàn)十大常見的排序算法示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-08-08
  • go結(jié)構(gòu)體嵌套的切片數(shù)組操作

    go結(jié)構(gòu)體嵌套的切片數(shù)組操作

    這篇文章主要介紹了go結(jié)構(gòu)體嵌套的切片數(shù)組操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-04-04
  • 如何讓shell終端和goland控制臺輸出彩色的文字

    如何讓shell終端和goland控制臺輸出彩色的文字

    這篇文章主要介紹了如何讓shell終端和goland控制臺輸出彩色的文字的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-05-05

最新評論