詳解Go語言中的監(jiān)視器模式與配置熱更新
上篇介紹 GO 的 GUI 庫 Fyne 時,提到 Fyne 的數據綁定用到了監(jiān)聽器模式。本文就展開說下我對 Go 中監(jiān)聽器模式的理解和應用吧。
監(jiān)聽器模式簡介
監(jiān)聽器模式,或稱觀察者模式,它主要涉及兩個組件:主題(Subject)和監(jiān)聽器(Listener)。
Subject 負責維護一系列的監(jiān)聽器,在所觀測主題狀態(tài)變化,將這個事件通知給所有注冊的監(jiān)聽器。我統一將其定義為注冊中心 Registry。而監(jiān)聽器 Listener 則是實現了特定接口的對象,用于響應事件消息,執(zhí)行處理邏輯。
對具體應用而言,通常還會分出一個 Watcher
或者 Monitor
用于檢測變化并推送給 Registry
。從而實現將檢測目標從系統解耦,無視監(jiān)控組件類別。
這個模式在組件之間建立一種松散耦合的關系。將特定事件通知到關心它的其他組件,無需它們直接相互引用??雌饋磉@個不也是發(fā)布-訂閱模式嗎?差不多一個意思。
之前工作中,用它最多的是配置的熱更新場景,這篇文章也會簡單介紹基于它的 ETCD 配置熱更新。
Go 實現監(jiān)聽器模式
如何用 Go 實現監(jiān)聽模式?我將定義兩個新類型分別是注冊中心(Registry)和監(jiān)聽器接口(Listener)。
首先是 Listener
,它是一個接口,用于實現事件的響應邏輯。
type Listener interface { OnTrigger() }
先將其定義為一個接口,它的實現類型要求支持 OnTrigger
方法,會在事件發(fā)生時被執(zhí)行。
type Registry struct { listeners []Listener } func (r *Registry) Register(l Listener) { r.listeners = append(r.listeners, l) } func (r *Registry) NotifyAll() { for _, listener := range r.listeners { listener.OnTrigger(key, value) } }
Registry
是所有監(jiān)聽器的注冊地,當特定事件發(fā)生,我們通過 Registry.NotifyAll
將事件傳遞給所有 Listener
。
我們實現一個簡單的案例,當監(jiān)聽到某個事件發(fā)生,打印 "A specified event accured"。
為了模擬效果,本案例沒有 watcher,直接通過主函數調用 NotifyAll
模擬觸發(fā)事件。
為了打印事件消息,我們實現 Listener
接口,創(chuàng)建新類型 EventPrinter,如下所示:
type EventPrinter struct { } func (printer *EventPrinter) OnTrigger() { fmt.Println("A specified event accured!") }
寫個主函數觸發(fā)下事件,測試看看是否符合預期,代碼如下所示:
func main() { r := &Registry{} r.Reigster(&EventPrinter{}) // 模擬接收到消息,觸發(fā)事件通知 r.NotifyAll() }
執(zhí)行測試,內容如下所示:
$ go run main.go A specified event occurred
如果希望自定義處理函數,只需讓 Listener
支持自定義事件回調函數即可。
修改代碼如下所示:
type EventHandler struct { callback func() } func NewEventHandler(callback func()) *EventHandler { return &EventHandler{callback: callback} } func (e *EventHandler) OnTrigger() { e.callback() }
我們注冊一個 EventHandler
到 Registry
,主函數代碼:
func main() { r := &Registry{} r.Reigster(&EventPrinter{}) r.Reigster(NewEventHandler(func() { fmt.Println("Custom Print: a specified event occurred!") })) r.NotifyAll() }
測試執(zhí)行:
$ go run main.go A specified event occurred! Custom Print: a specified event occurred!
基于 Go Channel 實現并發(fā)處理
前面的示例中 NotifyAll
是通過 for 循環(huán)依次調用 listener.OnTrigger
將消息發(fā)送給 Listener
,處理效率低下。
如何加速呢?
最直接的方法是通過 goroutine
運行 listener.OnTrigger
方法。
func (r *Registry) NotifyAll() { for _, listener := range r.listeners { go listener.OnTrigger() } }
還有一種方法,通過 Channel 傳遞事件消息,這樣每個 Listener
有獨立的 goroutine 監(jiān)聽和處理。
如下是 Listener
的實現代碼:
type Listener struct { EventChannel chan struct{} Callback func() } func NewListener(callback func()) *Listener { return &Listener{ EventChannel: make(chan struct{}, 1), // 帶緩沖的 channel,防止阻塞 Callback: callback, } } func (l *Listener) Start() { go func() { for range l.EventChannel { l.Callback() } }() }
這里 Listener
的事件處理函數在單獨的 goroutine 中運行。而相應的 Registry 實現也需要修改,代碼變更如下所示:
type Registry struct { listeners []*Listener } func (r *Registry) Register(listener *Listener) { r.listeners = append(r.listeners, listener) listener.Start() // 啟動監(jiān)聽器的 goroutine } func (r *Registry) NotifyAll(message string) { for _, listener := range r.listeners { listener.EventChannel <- struct{}{} // 發(fā)送事件到監(jiān)聽器 } } func (r *Registry) Close() { for _, listener := range r.listeners { close(listener.EventChannel) // 關閉 channel,停止監(jiān)聽器 goroutine } }
整體上的變化不大,在 listner.Register
方法中啟動 Listener
事件處理 goroutine 等待事件消息。
實際案例:ETCD 配置熱更新
讓我們實踐一個具體的應用場景:實現配置的動態(tài)更新以及組件的自動重連機制。
我們將針對包括 MySQL、Redis 在內的各種組件,實現它們在配置變更時能夠自動重連。這些組件的配置信息將以 JSON 格式存儲于 ETCD 的多個鍵(Key)中。
假設,配置結構如下所示:
type MySQLConfig struct { Host string Port int User string Password string } type RedisConfig struct { Host string Port int }
這些配置被保存在 ETCD 中,我們要實時監(jiān)控配置的變化并據此更新配置和執(zhí)行重連操作。
示例用法如下所示:
registry.Register("/config/mysql", func(data) { // unmarshal data // reconnect mysql })
讓我們基于監(jiān)聽器模式簡單設計一個模塊,實現 ETCD 熱更新:
- 每個監(jiān)聽器可以訂閱特定的 key 或 key 前綴的更新事件。
- 使用
channel
通知配置變更,觸發(fā)對應的監(jiān)聽回調。
這個示例,函數回調和輪詢其實已經滿足需求,此處只是為了演示,而是否使用 channel 要具體分析。
我們這個設計要涉及到三個部分。分別是 Watcher、Listener 和 Registry。
- Watcher 責監(jiān)聽 ETCD 中的 key 變更事件。
- Listener 定義了當特定 key 發(fā)生變化時需要執(zhí)行的回調邏輯。
- Registry 管理所有 Listener,將 ETCD 變更事件分發(fā)給對應 Listener。
先定義 Event
類型,一個簡單的結構體,表示 ETCD 中 key 的變更事件:
type Event struct { Key string Value string }
Listener
Listener
實現如下所示:
type Listener struct { EventChannel chan *Event Callback func(*Event) } func NewListner(callback func(*Event)) *Listener { l := &Listener{ EventChannel: make(chan *Event), Callback: callback, } return l } func (l *Listener) Start() { go func() { for event := range l.EventChannel { l.Callback(event) } }() }
基本之前的沒太大差別,從 EventChannel
中拿到事件消息,調用回調函數。
實現 Registry
Registry
負責維護 Listener
的注冊,并在接收到 key 變更事件時通知相關的 Listener
:
type Registry struct { listeners map[string][]*Listener } func NewRegistry() *Registry { return &Registry{ listeners: make(map[string][]*Listener), } } func (r *Registry) Register(key string, listener *Listener) { r.listeners[key] = append(r.listeners[key], listener) listener.Start() } func (r *Registry) Notify(event *Event) { if listeners, ok := r.listeners[event.Key]; ok { for _, listener := range listeners { listener.EventChannel <- event } } }
注冊 Listener
到 Registry
中,通過 map
將 key
與 Listener
關聯起來。
實現 Watcher
Watcher
負責從 ETCD 訂閱 key 的變更事件,并將這些事件發(fā)送到 Registry
的 eventChannel
上:
func WatchEtcdKeys(client *clientv3.Client, registry *Registry, watchKeys ...string) { for _, key := range watchKeys { go func(key string) { watchChan := client.Watch(context.Background(), key, clientv3.WithPrefix()) for wresp := range watchChan { for _, ev := range wresp.Events { event := &Event{ Key: string(ev.Kv.Key), Value: string(ev.Kv.Value), } registry.Notify(event) } } }(key) } }
使用示例
讓我們實際在 main 函數上使用一下,觀察行為是否正常。
func main() { client, err := clientv3.New(clientv3.Config{ Endpoints: []string{"localhost:2379"}, }) if err != nil { log.Fatal(err) } defer client.Close() registry := NewRegistry() // 注冊監(jiān)聽器 registry.Register("/config/mysql", NewListener(func(event * Event) { fmt.Println(event) // 執(zhí)行數據重連之類的操作 })) // 開始監(jiān)聽 ETCD key 變更 WatchEtcdKeys(client, registry, "/config/") time.Sleep(10 * time.Minute) }
這個示例創(chuàng)建了一個 ETCD 客戶端,初始化了一個 Registry
,并為特定的 key 注冊了一個 Listener
。然后,通過 WatchEtcdKeys
函數開始監(jiān)聽 /config/
前綴下的所有 key 的變更。
這種設計支持對特定 key 或 key 前綴的監(jiān)聽。當相關 key 變更時,通過 channel
通知 Listener
,而收到更新事件后的具體操作。視場景而定,這里是執(zhí)行重連操作。
特別說明,示例僅作為概念驗證,實際應用中需要更多的錯誤處理和優(yōu)化。
到此這篇關于詳解Go語言中的監(jiān)視器模式與配置熱更新的文章就介紹到這了,更多相關Go監(jiān)視器模式與配置熱更新內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
深入學習Golang并發(fā)編程必備利器之sync.Cond類型
Go?語言的?sync?包提供了一系列同步原語,其中?sync.Cond?就是其中之一。本文將深入探討?sync.Cond?的實現原理和使用方法,幫助大家更好地理解和應用?sync.Cond,需要的可以參考一下2023-05-05