Go高級(jí)特性探究之優(yōu)先級(jí)隊(duì)列詳解
什么是heap
Heap 是一種數(shù)據(jù)結(jié)構(gòu),其中包含一個(gè)特殊的根節(jié)點(diǎn),且每個(gè)節(jié)點(diǎn)的值都不小于(或不大于)其所有子節(jié)點(diǎn)的值。這種數(shù)據(jù)結(jié)構(gòu)常用于實(shí)現(xiàn)優(yōu)先隊(duì)列。
Heap的數(shù)據(jù)結(jié)構(gòu)
Heap 可以通過一個(gè)數(shù)組來實(shí)現(xiàn),這個(gè)數(shù)組滿足以下條件:
- 和二叉搜索樹不同,堆并不需要滿足左子節(jié)點(diǎn)小于父節(jié)點(diǎn)的值,右子節(jié)點(diǎn)大于父節(jié)點(diǎn)的值的條件。
- 堆中的一些列節(jié)點(diǎn)按照某種特定的順序排列。這樣的順序可以是最小的元素在最前面,也可以是最大的元素在最前面。這個(gè)順序滿足父節(jié)點(diǎn)一定小于(或大于)它的所有子節(jié)點(diǎn)。
- 堆中的元素?cái)?shù)量不一定是滿的,也就是說堆并不一定是一個(gè)完全二叉樹。
堆具有以下屬性。
- 任何節(jié)點(diǎn)都小于(或大于)其所有后代,并且最小元素(或最大元素)位于堆的根(堆有序性)。
- 堆始終是一棵完整的樹。即各級(jí)節(jié)點(diǎn)都填充除底層以外的元素,并且底層盡可能從左到右填充。
完全二叉樹和滿二叉樹的區(qū)別如下所示。

根節(jié)點(diǎn)最大的堆稱為最大堆或大根堆,根節(jié)點(diǎn)最小的堆稱為最小堆或小根堆。
由于堆是完全二叉樹,因此它們可以表示為順序數(shù)組,如下所示。

如何實(shí)現(xiàn)優(yōu)先級(jí)隊(duì)列
優(yōu)先隊(duì)列是一種數(shù)據(jù)結(jié)構(gòu),其中每個(gè)元素都有一個(gè)優(yōu)先級(jí),優(yōu)先級(jí)高的元素在前面,優(yōu)先級(jí)相同時(shí)按照插入順序排列。可以使用堆來實(shí)現(xiàn)優(yōu)先隊(duì)列。實(shí)現(xiàn)優(yōu)先隊(duì)列的關(guān)鍵是將一個(gè)元素添加到隊(duì)列中,并保持隊(duì)列中的元素有序。如果使用數(shù)組來存儲(chǔ)元素,需要頻繁對(duì)數(shù)組進(jìn)行調(diào)整,時(shí)間復(fù)雜度是O(n),不夠高效。如果使用堆來存儲(chǔ)元素,則可以在插入時(shí)進(jìn)行堆化,時(shí)間復(fù)雜度是O(nlogn)。
在堆中,節(jié)點(diǎn)的位置與它們?cè)跀?shù)組中的位置有一定的關(guān)系。例如,根節(jié)點(diǎn)位于數(shù)組的第一個(gè)元素,其他節(jié)點(diǎn)依次排列。左子節(jié)點(diǎn)位于(2i),右子節(jié)點(diǎn)位于(2i+1),父節(jié)點(diǎn)位于(i/2)。這個(gè)關(guān)系可以方便地實(shí)現(xiàn)在數(shù)組上進(jìn)行堆化的操作。

為什么需要使用優(yōu)先級(jí)隊(duì)列
優(yōu)先級(jí)隊(duì)列是一種非常有用的數(shù)據(jù)結(jié)構(gòu),在很多應(yīng)用中都會(huì)被廣泛使用。比如作業(yè)調(diào)度、事件管理等領(lǐng)域,都需要使用優(yōu)先級(jí)隊(duì)列來幫助處理任務(wù)以及事件等的優(yōu)先級(jí)順序。
優(yōu)點(diǎn)和缺點(diǎn)
優(yōu)點(diǎn)
- 簡(jiǎn)單高效:優(yōu)先級(jí)隊(duì)列的實(shí)現(xiàn)較為簡(jiǎn)單,查找和插入等操作都可以在 O(log(n))O(log(n))O(log(n)) 的時(shí)間復(fù)雜度內(nèi)完成,所以在實(shí)現(xiàn)簡(jiǎn)單的情況下,可以極大提高程序性能。
- 優(yōu)先級(jí):優(yōu)先級(jí)隊(duì)列可以根據(jù)任務(wù)或者事件的優(yōu)先級(jí),對(duì)其按照優(yōu)先級(jí)大小進(jìn)行排序,并在需要的時(shí)候依次處理。
缺點(diǎn)
- 空間占用:優(yōu)先級(jí)隊(duì)列需要占用額外的內(nèi)存空間,以存儲(chǔ)任務(wù)和事件的優(yōu)先級(jí)信息。
- 任務(wù)時(shí)效性:當(dāng)優(yōu)先級(jí)較高的任務(wù)過多時(shí),可能會(huì)導(dǎo)致低優(yōu)先級(jí)任務(wù)的響應(yīng)延遲,從而影響任務(wù)的時(shí)效性。
heap PriorityQueue實(shí)現(xiàn)
代碼來自github.com/hashicorp/vault/blob/main/sdk/queue/priority_queue.go
package go_pool_priority
import (
"container/heap"
"errors"
"sync"
"github.com/mitchellh/copystructure"
)
// ErrEmpty is returned for queues with no items
var ErrEmpty = errors.New("queue is empty")
// ErrDuplicateItem is returned when the queue attmepts to push an item to a key that
// already exists. The queue does not attempt to update, instead returns this
// error. If an Item needs to be updated or replaced, pop the item first.
var ErrDuplicateItem = errors.New("duplicate item")
// New initializes the internal data structures and returns a new
// PriorityQueue
func NewPriorityQueue() *PriorityQueue {
pq := PriorityQueue{
data: make(queue, 0),
dataMap: make(map[string]*Item),
}
heap.Init(&pq.data)
return &pq
}
// PriorityQueue facilitates queue of Items, providing Push, Pop, and
// PopByKey convenience methods. The ordering (priority) is an int64 value
// with the smallest value is the highest priority. PriorityQueue maintains both
// an internal slice for the queue as well as a map of the same items with their
// keys as the index. This enables users to find specific items by key. The map
// must be kept in sync with the data slice.
// See https://golang.org/pkg/container/heap/#example__priorityQueue
type PriorityQueue struct {
// data is the internal structure that holds the queue, and is operated on by
// heap functions
data queue
// dataMap represents all the items in the queue, with unique indexes, used
// for finding specific items. dataMap is kept in sync with the data slice
dataMap map[string]*Item
// lock is a read/write mutex, and used to facilitate read/write locks on the
// data and dataMap fields
lock sync.RWMutex
}
// queue is the internal data structure used to satisfy heap.Interface. This
// prevents users from calling Pop and Push heap methods directly
type queue []*Item
// Item is something managed in the priority queue
type Item struct {
// Key is a unique string used to identify items in the internal data map
Key string
// Value is an unspecified type that implementations can use to store
// information
Value interface{}
// Priority determines ordering in the queue, with the lowest value being the
// highest priority
Priority int64
// index is an internal value used by the heap package, and should not be
// modified by any consumer of the priority queue
index int
}
// Len returns the count of items in the Priority Queue
func (pq *PriorityQueue) Len() int {
pq.lock.RLock()
defer pq.lock.RUnlock()
return pq.data.Len()
}
// Pop pops the highest priority item from the queue. This is a
// wrapper/convenience method that calls heap.Pop, so consumers do not need to
// invoke heap functions directly
func (pq *PriorityQueue) Pop() (*Item, error) {
pq.lock.Lock()
defer pq.lock.Unlock()
if pq.data.Len() == 0 {
return nil, ErrEmpty
}
item := heap.Pop(&pq.data).(*Item)
delete(pq.dataMap, item.Key)
return item, nil
}
// Push pushes an item on to the queue. This is a wrapper/convenience
// method that calls heap.Push, so consumers do not need to invoke heap
// functions directly. Items must have unique Keys, and Items in the queue
// cannot be updated. To modify an Item, users must first remove it and re-push
// it after modifications
func (pq *PriorityQueue) Push(i *Item) error {
if i == nil || i.Key == "" {
return errors.New("error adding item: Item Key is required")
}
pq.lock.Lock()
defer pq.lock.Unlock()
if _, ok := pq.dataMap[i.Key]; ok {
return ErrDuplicateItem
}
// Copy the item value(s) so that modifications to the source item does not
// affect the item on the queue
clone, err := copystructure.Copy(i)
if err != nil {
return err
}
pq.dataMap[i.Key] = clone.(*Item)
heap.Push(&pq.data, clone)
return nil
}
// PopByKey searches the queue for an item with the given key and removes it
// from the queue if found. Returns nil if not found. This method must fix the
// queue after removing any key.
func (pq *PriorityQueue) PopByKey(key string) (*Item, error) {
pq.lock.Lock()
defer pq.lock.Unlock()
item, ok := pq.dataMap[key]
if !ok {
return nil, nil
}
// Remove the item the heap and delete it from the dataMap
itemRaw := heap.Remove(&pq.data, item.index)
delete(pq.dataMap, key)
if itemRaw != nil {
if i, ok := itemRaw.(*Item); ok {
return i, nil
}
}
return nil, nil
}
// Len returns the number of items in the queue data structure. Do not use this
// method directly on the queue, use PriorityQueue.Len() instead.
func (q queue) Len() int { return len(q) }
// Less returns whether the Item with index i should sort before the Item with
// index j in the queue. This method is used by the queue to determine priority
// internally; the Item with the lower value wins. (priority zero is higher
// priority than 1). The priority of Items with equal values is undetermined.
func (q queue) Less(i, j int) bool {
return q[i].Priority < q[j].Priority
}
// Swap swaps things in-place; part of sort.Interface
func (q queue) Swap(i, j int) {
q[i], q[j] = q[j], q[i]
q[i].index = i
q[j].index = j
}
// Push is used by heap.Interface to push items onto the heap. This method is
// invoked by container/heap, and should not be used directly.
// See: https://golang.org/pkg/container/heap/#Interface
func (q *queue) Push(x interface{}) {
n := len(*q)
item := x.(*Item)
item.index = n
*q = append(*q, item)
}
// Pop is used by heap.Interface to pop items off of the heap. This method is
// invoked by container/heap, and should not be used directly.
// See: https://golang.org/pkg/container/heap/#Interface
func (q *queue) Pop() interface{} {
old := *q
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*q = old[0 : n-1]
return item
}- 內(nèi)部使用container/heap中的Interface接口實(shí)現(xiàn)堆結(jié)構(gòu);
- 提供了Push、Pop和PopByKey等一系列方法;
- 使用一個(gè)內(nèi)部slice和一個(gè)以Key為索引的映射map來維護(hù)隊(duì)列元素;
- 根據(jù)元素的Priority值進(jìn)行優(yōu)先級(jí)排序,Priority值越小表示優(yōu)先級(jí)越高;
- 在Push時(shí)需要保證Key值唯一;
- PopByKey方法可以根據(jù)Key查找并移除對(duì)應(yīng)的元素。
實(shí)現(xiàn)思路
既然,我們了解了heap的一些特性,那么我們接下來就要考慮如何用現(xiàn)有的數(shù)據(jù)結(jié)構(gòu),實(shí)現(xiàn)優(yōu)先隊(duì)列。
我們都知道,無論是哪一種隊(duì)列,必然是存在生產(chǎn)者和消費(fèi)者兩個(gè)部分,對(duì)于優(yōu)先級(jí)隊(duì)列來說,更是如此。因此,咱們的實(shí)現(xiàn)思路,也將從這兩個(gè)部分來談。
生產(chǎn)者
對(duì)于生產(chǎn)者來說,他只需要推送一個(gè)任務(wù)及其優(yōu)先級(jí)過來,咱們就得根據(jù)優(yōu)先級(jí)處理他的任務(wù)。
由于,我們不大好判斷,到底會(huì)有多少種不同的優(yōu)先級(jí)傳過來,也無法確定,每種優(yōu)先級(jí)下有多少個(gè)任務(wù)要處理,所以,我們可以直接使用heap存儲(chǔ)task
消費(fèi)者
對(duì)于消費(fèi)者來說,他需要獲取優(yōu)先級(jí)最高的任務(wù)進(jìn)行消費(fèi)。使用heap pop 取出優(yōu)先級(jí)最高的任務(wù)即可

數(shù)據(jù)結(jié)構(gòu)
(1)優(yōu)先級(jí)隊(duì)列對(duì)象
type PriorityQueueTask struct {
mLock sync.Mutex // 互斥鎖,queues和priorities并發(fā)操作時(shí)使用,當(dāng)然針對(duì)當(dāng)前讀多寫少的場(chǎng)景,也可以使用讀寫鎖
pushChan chan *task // 推送任務(wù)管道
pq *PriorityQueue
}(2)任務(wù)對(duì)象
type task struct {
priority int64 // 任務(wù)的優(yōu)先級(jí)
value interface{}
key string
}初始化優(yōu)先級(jí)隊(duì)列對(duì)象
在初始化對(duì)象時(shí),需要先通過 NewPriorityQueue() 函數(shù)創(chuàng)建一個(gè)空的 PriorityQueue,然后再創(chuàng)建一個(gè) PriorityQueueTask 對(duì)象,并將剛剛創(chuàng)建的 PriorityQueue 賦值給該對(duì)象的 pq 屬性。同時(shí),還要?jiǎng)?chuàng)建一個(gè)用于接收推送任務(wù)的管道,用于在生產(chǎn)者推送任務(wù)時(shí),將新任務(wù)添加到隊(duì)列中。
func NewPriorityQueueTask() *PriorityQueueTask {
pq := &PriorityQueueTask{
pushChan: make(chan *task, 100),
pq: NewPriorityQueue(),
}
// 監(jiān)聽pushChan
go pq.listenPushChan()
return pq
}
func (pq *PriorityQueueTask) listenPushChan() {
for {
select {
case taskEle := <-pq.pushChan:
pq.mLock.Lock()
pq.pq.Push(&Item{Key: taskEle.key, Priority: taskEle.priority, Value: taskEle.value})
pq.mLock.Unlock()
}
}
}生產(chǎn)者推送任務(wù)
生產(chǎn)者向推送任務(wù)管道中推送新任務(wù)時(shí),實(shí)際上是將一個(gè) task 結(jié)構(gòu)體實(shí)例發(fā)送到了管道中。在 task 結(jié)構(gòu)體中,priority 屬性表示這個(gè)任務(wù)的優(yōu)先級(jí),value 屬性表示這個(gè)任務(wù)的值,key 屬性表示這個(gè)任務(wù)的鍵。
// 插入work
func (pq *PriorityQueueTask) Push(priority int64, value interface{}, key string) {
? ? pq.pushChan <- &task{
? ? ? ? value:? ? value,
? ? ? ? priority: priority,
? ? ? ? key:? ? ? key,
? ? }
}消費(fèi)者消費(fèi)隊(duì)列
消費(fèi)者從隊(duì)列中取出一個(gè)任務(wù),然后進(jìn)行相應(yīng)的操作。在這段代碼中,消費(fèi)者輪詢獲取最高優(yōu)先級(jí)的任務(wù)。如果沒有獲取到任務(wù),則繼續(xù)輪詢;如果獲取到了任務(wù),則執(zhí)行對(duì)應(yīng)的操作。在這里,執(zhí)行操作的具體形式是打印任務(wù)的編號(hào)、優(yōu)先級(jí)等信息。
// Consume 消費(fèi)者輪詢獲取最高優(yōu)先級(jí)的任務(wù)
func (pq *PriorityQueueTask) Consume() {
? ? for {
? ? ? ? task := pq.Pop()
? ? ? ? if task == nil {
? ? ? ? ? ? // 未獲取到任務(wù),則繼續(xù)輪詢
? ? ? ? ? ? time.Sleep(time.Millisecond)
? ? ? ? ? ? continue
? ? ? ? }
? ? ? ? // 獲取到了任務(wù),就執(zhí)行任務(wù)
? ? ? ? fmt.Println("推送任務(wù)的編號(hào)為:", task.Value)
? ? ? ? fmt.Println("推送的任務(wù)優(yōu)先級(jí)為:", task.Priority)
? ? ? ? fmt.Println("============")
? ? }
}完整代碼
package go_pool_priority
import (
"fmt"
"sync"
"time"
)
type PriorityQueueTask struct {
mLock sync.Mutex // 互斥鎖,queues和priorities并發(fā)操作時(shí)使用,當(dāng)然針對(duì)當(dāng)前讀多寫少的場(chǎng)景,也可以使用讀寫鎖
pushChan chan *task // 推送任務(wù)管道
pq *PriorityQueue
}
type task struct {
priority int64 // 任務(wù)的優(yōu)先級(jí)
value interface{}
key string
}
func NewPriorityQueueTask() *PriorityQueueTask {
pq := &PriorityQueueTask{
pushChan: make(chan *task, 100),
pq: NewPriorityQueue(),
}
// 監(jiān)聽pushChan
go pq.listenPushChan()
return pq
}
func (pq *PriorityQueueTask) listenPushChan() {
for {
select {
case taskEle := <-pq.pushChan:
pq.mLock.Lock()
pq.pq.Push(&Item{Key: taskEle.key, Priority: taskEle.priority, Value: taskEle.value})
pq.mLock.Unlock()
}
}
}
// 插入work
func (pq *PriorityQueueTask) Push(priority int64, value interface{}, key string) {
pq.pushChan <- &task{
value: value,
priority: priority,
key: key,
}
}
// Pop 取出最高優(yōu)先級(jí)隊(duì)列中的一個(gè)任務(wù)
func (pq *PriorityQueueTask) Pop() *Item {
pq.mLock.Lock()
defer pq.mLock.Unlock()
item, err := pq.pq.Pop()
if err != nil {
return nil
}
// 如果所有隊(duì)列都沒有任務(wù),則返回null
return item
}
// Consume 消費(fèi)者輪詢獲取最高優(yōu)先級(jí)的任務(wù)
func (pq *PriorityQueueTask) Consume() {
for {
task := pq.Pop()
if task == nil {
// 未獲取到任務(wù),則繼續(xù)輪詢
time.Sleep(time.Millisecond)
continue
}
// 獲取到了任務(wù),就執(zhí)行任務(wù)
fmt.Println("推送任務(wù)的編號(hào)為:", task.Value)
fmt.Println("推送的任務(wù)優(yōu)先級(jí)為:", task.Priority)
fmt.Println("============")
}
}測(cè)試用例
func TestQueue(t *testing.T) {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
pq := NewPriorityQueueTask()
// 我們?cè)谶@里,隨機(jī)生成一些優(yōu)先級(jí)任務(wù)
for i := 0; i < 100; i++ {
a := rand.Intn(1000)
go func(a int64) {
pq.Push(a, a, strconv.Itoa(int(a)))
}(int64(a))
}
// 這里會(huì)阻塞,消費(fèi)者會(huì)輪詢查詢?nèi)蝿?wù)隊(duì)列
pq.Consume()
}以上就是Go高級(jí)特性探究之優(yōu)先級(jí)隊(duì)列詳解的詳細(xì)內(nèi)容,更多關(guān)于Go優(yōu)先級(jí)隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Go語言的隊(duì)列和堆棧實(shí)現(xiàn)方法
- 用golang實(shí)現(xiàn)一個(gè)定時(shí)器任務(wù)隊(duì)列實(shí)例
- Django使用Celery異步任務(wù)隊(duì)列的使用
- golang實(shí)現(xiàn)redis的延時(shí)消息隊(duì)列功能示例
- Golang實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列
- 基于Golang實(shí)現(xiàn)延遲隊(duì)列(DelayQueue)
- Golang微服務(wù)框架Kratos實(shí)現(xiàn)Kafka消息隊(duì)列的方法
- Go語言隊(duì)列的四種實(shí)現(xiàn)及使用場(chǎng)景
相關(guān)文章
golang?gorm的預(yù)加載及軟刪硬刪的數(shù)據(jù)操作示例
這篇文章主要介紹了golang?gorm的預(yù)加載及軟刪硬刪的數(shù)據(jù)操作示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-04-04
Golang使用Gin框架實(shí)現(xiàn)http分塊傳輸
這篇文章主要為大家詳細(xì)介紹了Golang中如何使用Gin框架實(shí)現(xiàn)http分塊傳輸功能,文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)價(jià)值,需要的可以參考一下2023-05-05
golang中channel+error來做異步錯(cuò)誤處理有多香
官方推薦golang中錯(cuò)誤處理當(dāng)做值處理, 既然是值那就可以在channel中傳輸,這篇文章主要介紹了golang 錯(cuò)誤處理channel+error真的香,需要的朋友可以參考下2023-01-01
Golang的循環(huán)中break和continue語句的用法講解
這篇文章主要介紹了Golang的循環(huán)中break和continue語句的用法講解,是Go語言入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下2015-10-10
Go語言rune與字符串轉(zhuǎn)換的密切關(guān)系解析
這篇文章主要為大家介紹了Go語言rune與字符串轉(zhuǎn)換的密切關(guān)系示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12
golang數(shù)組-----尋找數(shù)組中缺失的整數(shù)方法
這篇文章主要介紹了golang數(shù)組-----尋找數(shù)組中缺失的整數(shù)方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-12-12

