Go高級特性探究之優(yōu)先級隊列詳解
什么是heap
Heap 是一種數(shù)據(jù)結(jié)構(gòu),其中包含一個特殊的根節(jié)點,且每個節(jié)點的值都不小于(或不大于)其所有子節(jié)點的值。這種數(shù)據(jù)結(jié)構(gòu)常用于實現(xiàn)優(yōu)先隊列。
Heap的數(shù)據(jù)結(jié)構(gòu)
Heap 可以通過一個數(shù)組來實現(xiàn),這個數(shù)組滿足以下條件:
- 和二叉搜索樹不同,堆并不需要滿足左子節(jié)點小于父節(jié)點的值,右子節(jié)點大于父節(jié)點的值的條件。
- 堆中的一些列節(jié)點按照某種特定的順序排列。這樣的順序可以是最小的元素在最前面,也可以是最大的元素在最前面。這個順序滿足父節(jié)點一定小于(或大于)它的所有子節(jié)點。
- 堆中的元素數(shù)量不一定是滿的,也就是說堆并不一定是一個完全二叉樹。
堆具有以下屬性。
- 任何節(jié)點都小于(或大于)其所有后代,并且最小元素(或最大元素)位于堆的根(堆有序性)。
- 堆始終是一棵完整的樹。即各級節(jié)點都填充除底層以外的元素,并且底層盡可能從左到右填充。
完全二叉樹和滿二叉樹的區(qū)別如下所示。

根節(jié)點最大的堆稱為最大堆或大根堆,根節(jié)點最小的堆稱為最小堆或小根堆。
由于堆是完全二叉樹,因此它們可以表示為順序數(shù)組,如下所示。

如何實現(xiàn)優(yōu)先級隊列
優(yōu)先隊列是一種數(shù)據(jù)結(jié)構(gòu),其中每個元素都有一個優(yōu)先級,優(yōu)先級高的元素在前面,優(yōu)先級相同時按照插入順序排列。可以使用堆來實現(xiàn)優(yōu)先隊列。實現(xiàn)優(yōu)先隊列的關(guān)鍵是將一個元素添加到隊列中,并保持隊列中的元素有序。如果使用數(shù)組來存儲元素,需要頻繁對數(shù)組進行調(diào)整,時間復雜度是O(n),不夠高效。如果使用堆來存儲元素,則可以在插入時進行堆化,時間復雜度是O(nlogn)。
在堆中,節(jié)點的位置與它們在數(shù)組中的位置有一定的關(guān)系。例如,根節(jié)點位于數(shù)組的第一個元素,其他節(jié)點依次排列。左子節(jié)點位于(2i),右子節(jié)點位于(2i+1),父節(jié)點位于(i/2)。這個關(guān)系可以方便地實現(xiàn)在數(shù)組上進行堆化的操作。

為什么需要使用優(yōu)先級隊列
優(yōu)先級隊列是一種非常有用的數(shù)據(jù)結(jié)構(gòu),在很多應用中都會被廣泛使用。比如作業(yè)調(diào)度、事件管理等領域,都需要使用優(yōu)先級隊列來幫助處理任務以及事件等的優(yōu)先級順序。
優(yōu)點和缺點
優(yōu)點
- 簡單高效:優(yōu)先級隊列的實現(xiàn)較為簡單,查找和插入等操作都可以在 O(log(n))O(log(n))O(log(n)) 的時間復雜度內(nèi)完成,所以在實現(xiàn)簡單的情況下,可以極大提高程序性能。
- 優(yōu)先級:優(yōu)先級隊列可以根據(jù)任務或者事件的優(yōu)先級,對其按照優(yōu)先級大小進行排序,并在需要的時候依次處理。
缺點
- 空間占用:優(yōu)先級隊列需要占用額外的內(nèi)存空間,以存儲任務和事件的優(yōu)先級信息。
- 任務時效性:當優(yōu)先級較高的任務過多時,可能會導致低優(yōu)先級任務的響應延遲,從而影響任務的時效性。
heap PriorityQueue實現(xiàn)
代碼來自github.com/hashicorp/vault/blob/main/sdk/queue/priority_queue.go
package go_pool_priority
import (
"container/heap"
"errors"
"sync"
"github.com/mitchellh/copystructure"
)
// ErrEmpty is returned for queues with no items
var ErrEmpty = errors.New("queue is empty")
// ErrDuplicateItem is returned when the queue attmepts to push an item to a key that
// already exists. The queue does not attempt to update, instead returns this
// error. If an Item needs to be updated or replaced, pop the item first.
var ErrDuplicateItem = errors.New("duplicate item")
// New initializes the internal data structures and returns a new
// PriorityQueue
func NewPriorityQueue() *PriorityQueue {
pq := PriorityQueue{
data: make(queue, 0),
dataMap: make(map[string]*Item),
}
heap.Init(&pq.data)
return &pq
}
// PriorityQueue facilitates queue of Items, providing Push, Pop, and
// PopByKey convenience methods. The ordering (priority) is an int64 value
// with the smallest value is the highest priority. PriorityQueue maintains both
// an internal slice for the queue as well as a map of the same items with their
// keys as the index. This enables users to find specific items by key. The map
// must be kept in sync with the data slice.
// See https://golang.org/pkg/container/heap/#example__priorityQueue
type PriorityQueue struct {
// data is the internal structure that holds the queue, and is operated on by
// heap functions
data queue
// dataMap represents all the items in the queue, with unique indexes, used
// for finding specific items. dataMap is kept in sync with the data slice
dataMap map[string]*Item
// lock is a read/write mutex, and used to facilitate read/write locks on the
// data and dataMap fields
lock sync.RWMutex
}
// queue is the internal data structure used to satisfy heap.Interface. This
// prevents users from calling Pop and Push heap methods directly
type queue []*Item
// Item is something managed in the priority queue
type Item struct {
// Key is a unique string used to identify items in the internal data map
Key string
// Value is an unspecified type that implementations can use to store
// information
Value interface{}
// Priority determines ordering in the queue, with the lowest value being the
// highest priority
Priority int64
// index is an internal value used by the heap package, and should not be
// modified by any consumer of the priority queue
index int
}
// Len returns the count of items in the Priority Queue
func (pq *PriorityQueue) Len() int {
pq.lock.RLock()
defer pq.lock.RUnlock()
return pq.data.Len()
}
// Pop pops the highest priority item from the queue. This is a
// wrapper/convenience method that calls heap.Pop, so consumers do not need to
// invoke heap functions directly
func (pq *PriorityQueue) Pop() (*Item, error) {
pq.lock.Lock()
defer pq.lock.Unlock()
if pq.data.Len() == 0 {
return nil, ErrEmpty
}
item := heap.Pop(&pq.data).(*Item)
delete(pq.dataMap, item.Key)
return item, nil
}
// Push pushes an item on to the queue. This is a wrapper/convenience
// method that calls heap.Push, so consumers do not need to invoke heap
// functions directly. Items must have unique Keys, and Items in the queue
// cannot be updated. To modify an Item, users must first remove it and re-push
// it after modifications
func (pq *PriorityQueue) Push(i *Item) error {
if i == nil || i.Key == "" {
return errors.New("error adding item: Item Key is required")
}
pq.lock.Lock()
defer pq.lock.Unlock()
if _, ok := pq.dataMap[i.Key]; ok {
return ErrDuplicateItem
}
// Copy the item value(s) so that modifications to the source item does not
// affect the item on the queue
clone, err := copystructure.Copy(i)
if err != nil {
return err
}
pq.dataMap[i.Key] = clone.(*Item)
heap.Push(&pq.data, clone)
return nil
}
// PopByKey searches the queue for an item with the given key and removes it
// from the queue if found. Returns nil if not found. This method must fix the
// queue after removing any key.
func (pq *PriorityQueue) PopByKey(key string) (*Item, error) {
pq.lock.Lock()
defer pq.lock.Unlock()
item, ok := pq.dataMap[key]
if !ok {
return nil, nil
}
// Remove the item the heap and delete it from the dataMap
itemRaw := heap.Remove(&pq.data, item.index)
delete(pq.dataMap, key)
if itemRaw != nil {
if i, ok := itemRaw.(*Item); ok {
return i, nil
}
}
return nil, nil
}
// Len returns the number of items in the queue data structure. Do not use this
// method directly on the queue, use PriorityQueue.Len() instead.
func (q queue) Len() int { return len(q) }
// Less returns whether the Item with index i should sort before the Item with
// index j in the queue. This method is used by the queue to determine priority
// internally; the Item with the lower value wins. (priority zero is higher
// priority than 1). The priority of Items with equal values is undetermined.
func (q queue) Less(i, j int) bool {
return q[i].Priority < q[j].Priority
}
// Swap swaps things in-place; part of sort.Interface
func (q queue) Swap(i, j int) {
q[i], q[j] = q[j], q[i]
q[i].index = i
q[j].index = j
}
// Push is used by heap.Interface to push items onto the heap. This method is
// invoked by container/heap, and should not be used directly.
// See: https://golang.org/pkg/container/heap/#Interface
func (q *queue) Push(x interface{}) {
n := len(*q)
item := x.(*Item)
item.index = n
*q = append(*q, item)
}
// Pop is used by heap.Interface to pop items off of the heap. This method is
// invoked by container/heap, and should not be used directly.
// See: https://golang.org/pkg/container/heap/#Interface
func (q *queue) Pop() interface{} {
old := *q
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*q = old[0 : n-1]
return item
}- 內(nèi)部使用container/heap中的Interface接口實現(xiàn)堆結(jié)構(gòu);
- 提供了Push、Pop和PopByKey等一系列方法;
- 使用一個內(nèi)部slice和一個以Key為索引的映射map來維護隊列元素;
- 根據(jù)元素的Priority值進行優(yōu)先級排序,Priority值越小表示優(yōu)先級越高;
- 在Push時需要保證Key值唯一;
- PopByKey方法可以根據(jù)Key查找并移除對應的元素。
實現(xiàn)思路
既然,我們了解了heap的一些特性,那么我們接下來就要考慮如何用現(xiàn)有的數(shù)據(jù)結(jié)構(gòu),實現(xiàn)優(yōu)先隊列。
我們都知道,無論是哪一種隊列,必然是存在生產(chǎn)者和消費者兩個部分,對于優(yōu)先級隊列來說,更是如此。因此,咱們的實現(xiàn)思路,也將從這兩個部分來談。
生產(chǎn)者
對于生產(chǎn)者來說,他只需要推送一個任務及其優(yōu)先級過來,咱們就得根據(jù)優(yōu)先級處理他的任務。
由于,我們不大好判斷,到底會有多少種不同的優(yōu)先級傳過來,也無法確定,每種優(yōu)先級下有多少個任務要處理,所以,我們可以直接使用heap存儲task
消費者
對于消費者來說,他需要獲取優(yōu)先級最高的任務進行消費。使用heap pop 取出優(yōu)先級最高的任務即可

數(shù)據(jù)結(jié)構(gòu)
(1)優(yōu)先級隊列對象
type PriorityQueueTask struct {
mLock sync.Mutex // 互斥鎖,queues和priorities并發(fā)操作時使用,當然針對當前讀多寫少的場景,也可以使用讀寫鎖
pushChan chan *task // 推送任務管道
pq *PriorityQueue
}(2)任務對象
type task struct {
priority int64 // 任務的優(yōu)先級
value interface{}
key string
}初始化優(yōu)先級隊列對象
在初始化對象時,需要先通過 NewPriorityQueue() 函數(shù)創(chuàng)建一個空的 PriorityQueue,然后再創(chuàng)建一個 PriorityQueueTask 對象,并將剛剛創(chuàng)建的 PriorityQueue 賦值給該對象的 pq 屬性。同時,還要創(chuàng)建一個用于接收推送任務的管道,用于在生產(chǎn)者推送任務時,將新任務添加到隊列中。
func NewPriorityQueueTask() *PriorityQueueTask {
pq := &PriorityQueueTask{
pushChan: make(chan *task, 100),
pq: NewPriorityQueue(),
}
// 監(jiān)聽pushChan
go pq.listenPushChan()
return pq
}
func (pq *PriorityQueueTask) listenPushChan() {
for {
select {
case taskEle := <-pq.pushChan:
pq.mLock.Lock()
pq.pq.Push(&Item{Key: taskEle.key, Priority: taskEle.priority, Value: taskEle.value})
pq.mLock.Unlock()
}
}
}生產(chǎn)者推送任務
生產(chǎn)者向推送任務管道中推送新任務時,實際上是將一個 task 結(jié)構(gòu)體實例發(fā)送到了管道中。在 task 結(jié)構(gòu)體中,priority 屬性表示這個任務的優(yōu)先級,value 屬性表示這個任務的值,key 屬性表示這個任務的鍵。
// 插入work
func (pq *PriorityQueueTask) Push(priority int64, value interface{}, key string) {
? ? pq.pushChan <- &task{
? ? ? ? value:? ? value,
? ? ? ? priority: priority,
? ? ? ? key:? ? ? key,
? ? }
}消費者消費隊列
消費者從隊列中取出一個任務,然后進行相應的操作。在這段代碼中,消費者輪詢獲取最高優(yōu)先級的任務。如果沒有獲取到任務,則繼續(xù)輪詢;如果獲取到了任務,則執(zhí)行對應的操作。在這里,執(zhí)行操作的具體形式是打印任務的編號、優(yōu)先級等信息。
// Consume 消費者輪詢獲取最高優(yōu)先級的任務
func (pq *PriorityQueueTask) Consume() {
? ? for {
? ? ? ? task := pq.Pop()
? ? ? ? if task == nil {
? ? ? ? ? ? // 未獲取到任務,則繼續(xù)輪詢
? ? ? ? ? ? time.Sleep(time.Millisecond)
? ? ? ? ? ? continue
? ? ? ? }
? ? ? ? // 獲取到了任務,就執(zhí)行任務
? ? ? ? fmt.Println("推送任務的編號為:", task.Value)
? ? ? ? fmt.Println("推送的任務優(yōu)先級為:", task.Priority)
? ? ? ? fmt.Println("============")
? ? }
}完整代碼
package go_pool_priority
import (
"fmt"
"sync"
"time"
)
type PriorityQueueTask struct {
mLock sync.Mutex // 互斥鎖,queues和priorities并發(fā)操作時使用,當然針對當前讀多寫少的場景,也可以使用讀寫鎖
pushChan chan *task // 推送任務管道
pq *PriorityQueue
}
type task struct {
priority int64 // 任務的優(yōu)先級
value interface{}
key string
}
func NewPriorityQueueTask() *PriorityQueueTask {
pq := &PriorityQueueTask{
pushChan: make(chan *task, 100),
pq: NewPriorityQueue(),
}
// 監(jiān)聽pushChan
go pq.listenPushChan()
return pq
}
func (pq *PriorityQueueTask) listenPushChan() {
for {
select {
case taskEle := <-pq.pushChan:
pq.mLock.Lock()
pq.pq.Push(&Item{Key: taskEle.key, Priority: taskEle.priority, Value: taskEle.value})
pq.mLock.Unlock()
}
}
}
// 插入work
func (pq *PriorityQueueTask) Push(priority int64, value interface{}, key string) {
pq.pushChan <- &task{
value: value,
priority: priority,
key: key,
}
}
// Pop 取出最高優(yōu)先級隊列中的一個任務
func (pq *PriorityQueueTask) Pop() *Item {
pq.mLock.Lock()
defer pq.mLock.Unlock()
item, err := pq.pq.Pop()
if err != nil {
return nil
}
// 如果所有隊列都沒有任務,則返回null
return item
}
// Consume 消費者輪詢獲取最高優(yōu)先級的任務
func (pq *PriorityQueueTask) Consume() {
for {
task := pq.Pop()
if task == nil {
// 未獲取到任務,則繼續(xù)輪詢
time.Sleep(time.Millisecond)
continue
}
// 獲取到了任務,就執(zhí)行任務
fmt.Println("推送任務的編號為:", task.Value)
fmt.Println("推送的任務優(yōu)先級為:", task.Priority)
fmt.Println("============")
}
}測試用例
func TestQueue(t *testing.T) {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
pq := NewPriorityQueueTask()
// 我們在這里,隨機生成一些優(yōu)先級任務
for i := 0; i < 100; i++ {
a := rand.Intn(1000)
go func(a int64) {
pq.Push(a, a, strconv.Itoa(int(a)))
}(int64(a))
}
// 這里會阻塞,消費者會輪詢查詢?nèi)蝿贞犃?
pq.Consume()
}以上就是Go高級特性探究之優(yōu)先級隊列詳解的詳細內(nèi)容,更多關(guān)于Go優(yōu)先級隊列的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
golang?gorm的預加載及軟刪硬刪的數(shù)據(jù)操作示例
這篇文章主要介紹了golang?gorm的預加載及軟刪硬刪的數(shù)據(jù)操作示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步早日升職加薪2022-04-04
golang中channel+error來做異步錯誤處理有多香
官方推薦golang中錯誤處理當做值處理, 既然是值那就可以在channel中傳輸,這篇文章主要介紹了golang 錯誤處理channel+error真的香,需要的朋友可以參考下2023-01-01
Golang的循環(huán)中break和continue語句的用法講解
這篇文章主要介紹了Golang的循環(huán)中break和continue語句的用法講解,是Go語言入門學習中的基礎知識,需要的朋友可以參考下2015-10-10
Go語言rune與字符串轉(zhuǎn)換的密切關(guān)系解析
這篇文章主要為大家介紹了Go語言rune與字符串轉(zhuǎn)換的密切關(guān)系示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-12-12
golang數(shù)組-----尋找數(shù)組中缺失的整數(shù)方法
這篇文章主要介紹了golang數(shù)組-----尋找數(shù)組中缺失的整數(shù)方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12

