亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

golang中ants協(xié)程池使用和實(shí)現(xiàn)邏輯

 更新時(shí)間:2025年07月24日 10:57:04   作者:小許cod  
本文主要介紹了golang中ants協(xié)程池使用和實(shí)現(xiàn)邏輯,實(shí)現(xiàn)了對(duì)大規(guī)模?goroutine?的調(diào)度管理、goroutine?復(fù)用,下面就來(lái)具體介紹一下,感興趣的可以了解一下

golang中g(shù)oroutine由運(yùn)行時(shí)管理,使用go關(guān)鍵字就可以方便快捷的創(chuàng)建一個(gè)goroutine,受限于服務(wù)器硬件內(nèi)存大小,如果不對(duì)goroutine數(shù)量進(jìn)行限制,會(huì)出現(xiàn)Out of Memory錯(cuò)誤。但是goroutine泄漏引發(fā)的血案,想必各位gopher都經(jīng)歷過(guò),通過(guò)協(xié)程池限制goroutine數(shù)一個(gè)有效避免泄漏的手段,但是自己手動(dòng)實(shí)現(xiàn)一個(gè)協(xié)程池,總是會(huì)兼顧不到各種場(chǎng)景,比如釋放,處理panic,動(dòng)態(tài)擴(kuò)容等。那么ants是公認(rèn)的優(yōu)秀實(shí)現(xiàn)協(xié)程池。

ants簡(jiǎn)介

ants是一個(gè)高性能的 goroutine 池,實(shí)現(xiàn)了對(duì)大規(guī)模 goroutine 的調(diào)度管理、goroutine 復(fù)用,允許使用者在開(kāi)發(fā)并發(fā)程序的時(shí)候限制 goroutine 數(shù)量,復(fù)用資源,達(dá)到更高效執(zhí)行任務(wù)的效果

功能

  • 自動(dòng)調(diào)度海量的 goroutines,復(fù)用 goroutines
  • 定期清理過(guò)期的 goroutines,進(jìn)一步節(jié)省資源
  • 提供了大量有用的接口:任務(wù)提交、獲取運(yùn)行中的 goroutine 數(shù)量、動(dòng)態(tài)調(diào)整 Pool 大小、釋放 Pool、重啟 Pool
  • 優(yōu)雅處理 panic,防止程序崩潰
  • 資源復(fù)用,極大節(jié)省內(nèi)存使用量;在大規(guī)模批量并發(fā)任務(wù)場(chǎng)景下比原生 goroutine 并發(fā)具有更高的性能
  • 非阻塞機(jī)制

1.ants庫(kù)結(jié)構(gòu)

學(xué)習(xí)一個(gè)庫(kù)先從結(jié)構(gòu)看起吧,pool、pool_func、ants初始化一個(gè)pool等操作都在這里

ants庫(kù)代碼結(jié)構(gòu)

  • pool.go提供了ants.NewPool(創(chuàng)建協(xié)程池)、Submit(task func())提交任務(wù)
  • pool_func.go使用NewPoolWithFunc(創(chuàng)建pool對(duì)象需要帶具體的函數(shù)),并且使用Invoke(args interface{})進(jìn)行調(diào)用,arg就是傳給池函數(shù)func(interface{})的參數(shù)
  • options.go使用函數(shù)選項(xiàng)模式進(jìn)行參數(shù)配置
  • ants.go給初始化默認(rèn)協(xié)程池對(duì)象defaultAntsPool(默認(rèn)的pool容量是math.MaxInt32)提供了公共函數(shù)

介紹完了主要的庫(kù)文件后,我們進(jìn)行逐個(gè)的了解,具體的使用,我們可以結(jié)合官方的使用案例進(jìn)行了解,這里就不進(jìn)行展開(kāi)了。

2.ants中Pool創(chuàng)建對(duì)象

創(chuàng)建Pool對(duì)象需調(diào)用ants.NewPool(size, options)函數(shù),返回一個(gè)pool的指針

先看Pool的接口,對(duì)我們創(chuàng)建的Pool先做個(gè)初步印象

Pool結(jié)構(gòu)體

// NewPool generates an instance of ants pool.
func NewPool(size int, options ...Option) (*Pool, error) {
    opts := loadOptions(options...)

    if size <= 0 {
        size = -1
    }

    if expiry := opts.ExpiryDuration; expiry < 0 {
        return nil, ErrInvalidPoolExpiry
    } else if expiry == 0 {
        opts.ExpiryDuration = DefaultCleanIntervalTime
    }

    if opts.Logger == nil {
        opts.Logger = defaultLogger
    }

    p := &Pool{
        capacity: int32(size),
        lock:     internal.NewSpinLock(),
        options:  opts,
    }
    p.workerCache.New = func() interface{} {
        return &goWorker{
            pool: p,
            task: make(chan func(), workerChanCap),
        }
    }
    if p.options.PreAlloc {
        if size == -1 {
            return nil, ErrInvalidPreAllocSize
        }
        p.workers = newWorkerArray(loopQueueType, size)
    } else {
        p.workers = newWorkerArray(stackType, 0)
    }

    p.cond = sync.NewCond(p.lock)

    // Start a goroutine to clean up expired workers periodically.
    go p.purgePeriodically()

    return p, nil
}

ants.NewPool創(chuàng)建Pool過(guò)程

  • 接收size參數(shù)作為pool的容量,如果size<=0,那么不對(duì)池子容量進(jìn)行限制
  • loadOptions對(duì)Pool的配置,比如是否阻塞模式,
  • workerCache這個(gè)sync.Pool對(duì)象的New方法,在調(diào)用sync.Pool的Get()方法時(shí),如果為nil,則返回workerCache.New()的結(jié)果
  • 是否初始化Pool是進(jìn)行內(nèi)存預(yù)分配(size > 0),來(lái)創(chuàng)建不同的worker(stack、loopQueue兩種模式)
  • 使用p.lock鎖創(chuàng)建一個(gè)條件變量
  • 開(kāi)啟一個(gè)協(xié)程定期清理過(guò)期的workers

3.ants中的PoolWithFunc

ants.PoolWithFunc創(chuàng)建PoolWithFunc和New.Pool整體的結(jié)構(gòu)很像,多了個(gè)poolFunc func(interface{})字段,也就是提交到池子的函數(shù),然后workers的類型不一樣

4.理解worker

可以查看出pool中的worker在整個(gè)流程起著很重要的作用,也就是ants中為每個(gè)任務(wù)都是由 worker 對(duì)象來(lái)處理的,每個(gè)work都會(huì)創(chuàng)建一個(gè)goroutine來(lái)處理任務(wù),ants中的worker結(jié)構(gòu)如下

type goWorker struct {
    //work的所屬者
    pool *Pool

    //任務(wù)通道,通過(guò)這個(gè)發(fā)送給goWorker
    task chan func()

    //將work放入到隊(duì)列時(shí)更新
    recycleTime time.Time
}

從ants.Pool創(chuàng)建對(duì)象Pool的過(guò)程第四步可以看出,通過(guò)newWorkerArray創(chuàng)建workers,因?yàn)閣orkerArray是個(gè)接口,有如下方法。

type workerArray interface {
    len() int
    isEmpty() bool
    insert(worker *goWorker) error
    detach() *goWorker
    retrieveExpiry(duration time.Duration) []*goWorker
    reset()
}

通過(guò)newWorkerArray,返回實(shí)現(xiàn)了workerArray接口的workerStack,這里newWorkerArray其實(shí)是用了個(gè)工廠方法來(lái)實(shí)現(xiàn)的,根據(jù)傳入的類型,并不需要知道具體實(shí)現(xiàn)了接口的結(jié)構(gòu)體,只要實(shí)現(xiàn)了workerArray接口就可以返回實(shí)現(xiàn)者的結(jié)構(gòu)體,然后調(diào)用具體的實(shí)現(xiàn)

5.提交任務(wù)Submit

Submit(task func())接收一個(gè)func作為參數(shù),將task通過(guò)通道task將類型為func的函數(shù)給到goWorker,然后調(diào)用retrieveWorker返回一個(gè)可用的worker給task

func (p *Pool) retrieveWorker() (w *goWorker) {
    spawnWorker := func() {
        w = p.workerCache.Get().(*goWorker)
        w.run()
    }

    p.lock.Lock()

    w = p.workers.detach()
    if w != nil { // first try to fetch the worker from the queue
        p.lock.Unlock()
    } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
        // if the worker queue is empty and we don't run out of the pool capacity,
        // then just spawn a new worker goroutine.
        p.lock.Unlock()
        spawnWorker()
    } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
        if p.options.Nonblocking {
            p.lock.Unlock()
            return
        }
    retry:
        if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
            p.lock.Unlock()
            return
        }
        p.blockingNum++
        p.cond.Wait() // block and wait for an available worker
        p.blockingNum--
        var nw int
        if nw = p.Running(); nw == 0 { // awakened by the scavenger
            p.lock.Unlock()
            if !p.IsClosed() {
                spawnWorker()
            }
            return
        }
        if w = p.workers.detach(); w == nil {
            if nw < capacity {
                p.lock.Unlock()
                spawnWorker()
                return
            }
            goto retry
        }

        p.lock.Unlock()
    }
    return
}

執(zhí)行過(guò)程分析:

  • spawnWorker是一個(gè)func,從p.workerCache這個(gè)sync.Pool獲取一個(gè)goWorker對(duì)象(在New.Pool中有講到),用sync.Locker上鎖
  • 調(diào)用p.workers.detach方法(前面提到p.workers實(shí)現(xiàn)了workerArray接口)
  • 如果獲取到了goWorker對(duì)象就直接返回
  • 如果worker隊(duì)列為空,并且Pool還有容量,那么調(diào)用spawnWorker,調(diào)用worker的run方法啟動(dòng)一個(gè)新的協(xié)程處理任務(wù)
  • run方法的實(shí)現(xiàn)如下,從goWorker的channel中遍歷待執(zhí)行的func(),執(zhí)行,并且在執(zhí)行完后調(diào)用revertWorker放回workers
func (w *goWorker) run() {
    w.pool.incRunning()
    go func() {

        for f := range w.task {
            if f == nil {
                return
            }
            f()
            if ok := w.pool.revertWorker(w); !ok {
                return
            }
        }
    }()
}

6.釋放和重啟Pool

釋放和重啟Pool分別調(diào)用了Release和Reboot,這兩個(gè)函數(shù)都在ants.Pool這個(gè)文件中可以找到,具體實(shí)現(xiàn)這里做個(gè)簡(jiǎn)單說(shuō)明

  • Release調(diào)用p.workers.reset()結(jié)束loopQueue或wokerStack中的 goroutine。都是通過(guò)發(fā)送nil到goWorker的task通道中,然后重置各個(gè)字段的值
  • Reboot調(diào)用purgePeriodically,檢測(cè)到Pool關(guān)閉了就直接退出了

7.細(xì)節(jié)

task緩沖通道

下面這個(gè)是NewPool變量workerCachesyn類型sync.Pool創(chuàng)建goWorker對(duì)象的代碼

p.workerCache.New = func() interface{} {
        return &goWorker{
            pool: p,
            task: make(chan func(), workerChanCap),
        }
    }

workerChanCap作為容量,這個(gè)變量定義在ants.go文件中的定義如下:

// workerChanCap determines whether the channel of a worker should be a buffered channel
    // to get the best performance. Inspired by fasthttp at
    // https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
    workerChanCap = func() int {
        // Use blocking channel if GOMAXPROCS=1.
        // This switches context from sender to receiver immediately,
        // which results in higher performance (under go1.5 at least).
        if runtime.GOMAXPROCS(0) == 1 {
            return 0
        }

        // Use non-blocking workerChan if GOMAXPROCS>1,
        // since otherwise the sender might be dragged down if the receiver is CPU-bound.
        return 1
    }()

ants參考了著名的 Web框架fasthttp的實(shí)現(xiàn)。當(dāng)GOMAXPROCS為 1時(shí)(即操作系統(tǒng)線程數(shù)為1),向通道task發(fā)送會(huì)掛起發(fā)送 goroutine,將執(zhí)行流程轉(zhuǎn)向接收goroutine,這能提升接收處理性能。如果GOMAXPROCS大于1,ants使用帶緩沖的通道,為了防止接收 goroutine 是 CPU密集的,導(dǎo)致發(fā)送 goroutine 被阻塞。

自旋鎖 SpinLock

在NewPool中l(wèi)ock,其實(shí)給lock初始化了一個(gè)自旋鎖,這里是利用atomic.CompareAndSwapUint32()這個(gè)原子操作實(shí)現(xiàn)的,在加鎖失敗后不會(huì)等待,而是繼續(xù)嘗試,提高了加鎖減鎖的性能

在開(kāi)發(fā)中剛好遇到需要ants,這次也做個(gè)記錄作為分享,其實(shí)慢慢的會(huì)發(fā)現(xiàn)三方庫(kù)的xx_test用例是最好的學(xué)習(xí)例子,希望能和大家一起知其然知其所以然,加油!

到此這篇關(guān)于golang中ants協(xié)程池使用和實(shí)現(xiàn)邏輯的文章就介紹到這了,更多相關(guān)golang ants協(xié)程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 細(xì)說(shuō)Go語(yǔ)言中空結(jié)構(gòu)體的奇妙用途

    細(xì)說(shuō)Go語(yǔ)言中空結(jié)構(gòu)體的奇妙用途

    Go語(yǔ)言中,我們可以定義空結(jié)構(gòu)體,即沒(méi)有任何成員變量的結(jié)構(gòu)體,使用關(guān)鍵字?struct{}?來(lái)表示。這種結(jié)構(gòu)體似乎沒(méi)有任何用處,但實(shí)際上它在?Go?語(yǔ)言中的應(yīng)用非常廣泛,本文就來(lái)詳解講講
    2023-05-05
  • goland?-sync/atomic原子操作小結(jié)

    goland?-sync/atomic原子操作小結(jié)

    這篇文章主要介紹了goland?-sync/atomic原子操作,原子操作能夠保證執(zhí)行期間是連續(xù)且不會(huì)被中斷(變量不會(huì)被其他修改,mutex可能存在被其他修改的情況),本文給大家介紹的非常詳細(xì),需要的朋友參考下
    2022-08-08
  • Go語(yǔ)言數(shù)據(jù)結(jié)構(gòu)之希爾排序示例詳解

    Go語(yǔ)言數(shù)據(jù)結(jié)構(gòu)之希爾排序示例詳解

    這篇文章主要為大家介紹了Go語(yǔ)言數(shù)據(jù)結(jié)構(gòu)之希爾排序示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-08-08
  • Golang中g(shù)oroutine和channel使用介紹深入分析

    Golang中g(shù)oroutine和channel使用介紹深入分析

    一次只做一件事情并不是完成任務(wù)最快的方法,一些大的任務(wù)可以拆解成若干個(gè)小任務(wù),goroutine可以讓程序同時(shí)處理幾個(gè)不同的任務(wù),goroutine使用channel來(lái)協(xié)調(diào)它們的工作,channel允許goroutine互相發(fā)送數(shù)據(jù)并同步,這樣一個(gè)goroutine就不會(huì)領(lǐng)先于另一個(gè)goroutine
    2023-01-01
  • Golang字符串變位詞示例詳解

    Golang字符串變位詞示例詳解

    這篇文章主要給大家介紹了關(guān)于GoLang字符串變位詞的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。
    2017-10-10
  • 通過(guò)示例深度理解Go channel range

    通過(guò)示例深度理解Go channel range

    這篇文章主要為大家介紹了Go channel range使用示例深度理解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-05-05
  • Golang超全面講解并發(fā)

    Golang超全面講解并發(fā)

    goroutine?不是os線程、不是綠色線程(由語(yǔ)言運(yùn)行時(shí)管理的線程),是協(xié)程。協(xié)程是一種非搶占式的簡(jiǎn)單并發(fā)子goroutine(函數(shù)、閉包或方法),也就是說(shuō),它們不能被中斷。取而代之的是,協(xié)程有多個(gè)點(diǎn),允許暫?;蛑匦逻M(jìn)入?—Go語(yǔ)言并發(fā)之道
    2022-06-06
  • gorm 結(jié)構(gòu)體中 binding 和 msg 結(jié)構(gòu)體標(biāo)簽示例詳解

    gorm 結(jié)構(gòu)體中 binding 和 msg 結(jié)構(gòu)體標(biāo)簽示例詳解

    文章介紹了Gin框架中binding和msg結(jié)構(gòu)體標(biāo)簽的使用,包括基本用法、常用驗(yàn)證規(guī)則、自定義驗(yàn)證器、錯(cuò)誤信息自定義、控制器使用示例、組合驗(yàn)證規(guī)則、跨字段驗(yàn)證和初始化驗(yàn)證器等,這些標(biāo)簽主要用于數(shù)據(jù)驗(yàn)證、自定義錯(cuò)誤信息、參數(shù)綁定和表單驗(yàn)證
    2024-11-11
  • 一文帶你了解Go語(yǔ)言fmt標(biāo)準(zhǔn)庫(kù)輸出函數(shù)的使用

    一文帶你了解Go語(yǔ)言fmt標(biāo)準(zhǔn)庫(kù)輸出函數(shù)的使用

    這篇文章主要為大家詳細(xì)介紹了Go語(yǔ)言中 fmt 標(biāo)準(zhǔn)庫(kù)輸出函數(shù)的使用,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起了解一下
    2022-12-12
  • Go語(yǔ)言使用sqlx操作MySQL

    Go語(yǔ)言使用sqlx操作MySQL

    sqlx 包作為一個(gè)擴(kuò)展庫(kù),它在 database/sql 的基礎(chǔ)上,提供了更高級(jí)別的便利,極大地簡(jiǎn)化了數(shù)據(jù)庫(kù)操作,本文章將介紹如何通過(guò)sqlx包來(lái)操作 MySQL 數(shù)據(jù)庫(kù),感興趣的可以了解下
    2024-11-11

最新評(píng)論