基于Golang設(shè)計一套可控的定時任務(wù)系統(tǒng)
現(xiàn)在的系統(tǒng)設(shè)計中,有許多規(guī)律性的功能特征需要用到定時任務(wù)來完成,比如每分鐘需要執(zhí)行一次清理數(shù)據(jù)的任務(wù),每個月的第一天,需要處理一項什么任務(wù)等等這種,還有一種規(guī)律性的任務(wù)不是以時間間隔為第一維度切割的,而是如果任務(wù)執(zhí)行完成,不管成功與否,都間隔一段時間執(zhí)行一次任務(wù)等等。
像上面說描述的任務(wù)的特征,都需要我們?nèi)ブ芷谛缘膱?zhí)行任務(wù)主體,如果沒辦法對定時任務(wù)進行嚴(yán)格的控制管理,在生產(chǎn)環(huán)境下是非常危險的。比如上周發(fā)生的一起生產(chǎn)事故,定時任務(wù)去拉取某服務(wù)器的數(shù)據(jù),因為程序異常,導(dǎo)致產(chǎn)生龐大的協(xié)程拉取對方數(shù)據(jù),致使對方數(shù)據(jù)庫崩潰。
所以在享受一件技術(shù)帶來好處的同時,要嘗試在可控的范圍內(nèi)使用,才是我們調(diào)庫人員的基本操守,接下里,我們就一起探究一下,我是如何設(shè)計一套可控的定時任務(wù)系統(tǒng)的。
功能點:
- 任務(wù)的自動注冊
- 任務(wù)的信息管理,包括任務(wù)的cron表達式,任務(wù)的詳情信息備注
- 手動控制任務(wù)的啟動和執(zhí)行
- 任務(wù)的實例管理,可查看單次運行實例的任務(wù)日志
- 任務(wù)優(yōu)先級
- 保證至少執(zhí)行一次
- ...
整體結(jié)構(gòu)如下:
系統(tǒng)分為四個主要部分,從下到上,依次為:
- 任務(wù)的執(zhí)行體,完成任務(wù)的主要功能,實現(xiàn)為單個rpc方法,方便進行獨立部署,或者手動執(zhí)行指定次數(shù)時進行單獨調(diào)用的邏輯實現(xiàn)
- Worker服務(wù)節(jié)點,該節(jié)點單獨部署,對所有調(diào)度的任務(wù)進行執(zhí)行
- Schedule服務(wù)節(jié)點,該節(jié)點也是單獨部署,對Cronweb管理的所有任務(wù)進行同步
- Cronweb節(jié)點,任務(wù)的信息管理能力,可手動停止任務(wù)的執(zhí)行,可調(diào)整任務(wù)的執(zhí)行cron表達式,可定制任務(wù)執(zhí)行的超時重啟次數(shù)等等信息
最終,在調(diào)研了大多數(shù)任務(wù)管理的庫后,選擇了asynq.選擇什么庫,亦或是自己造輪子,都是根據(jù)業(yè)務(wù)需求來定的,以能滿足功能需求為首要標(biāo)準(zhǔn)。
Asynq是一個用于排隊任務(wù)并與woker異步處理的庫。它由Redis支持,設(shè)計為可擴展但易于啟動。
大致概述Asynq的工作方式:
- Client將任務(wù)放在隊列上
- Server從隊列中刪除任務(wù),并為每個任務(wù)啟動一個woker goroutine
- 任務(wù)是由多名woker同時處理的
任務(wù)隊列被用作跨多個計算機分配工作的機制。系統(tǒng)可以由多個worker servers 和brokers組成,使位于高可用性和水平規(guī)模。
上面兩段話為asynq的簡單介紹,剛看起來可能會有點抽象,那接下來,我們先來詳細的介紹一下這個庫,然后看看我們是如何對它進行封裝和調(diào)整的。
看一個簡單的實例
package main import ( "log" "github.com/hibiken/asynq" "your/app/package/tasks" ) const redisAddr = "127.0.0.1:6379" func main() { srv := asynq.NewServer( asynq.RedisClientOpt{Addr: redisAddr}, asynq.Config{ // Specify how many concurrent workers to use Concurrency: 10, // Optionally specify multiple queues with different priority. Queues: map[string]int{ "critical": 6, "default": 3, "low": 1, }, // See the godoc for other configuration options }, ) // mux maps a type to a handler mux := asynq.NewServeMux() mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask) mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor()) // ...register other handlers... if err := srv.Run(mux); err != nil { log.Fatalf("could not run server: %v", err) } }
上面這塊代碼實現(xiàn)了一個簡單的server,可以看到,
- asynq庫深度使用redis組件,所以你的系統(tǒng)如何沒有接入redis那就要重新考慮了,當(dāng)然現(xiàn)在的系統(tǒng)中redis的使用還是很普遍的。繼續(xù)看,實例化Server時,可以指定有多少個并發(fā)任務(wù)來處理調(diào)度的task.此項可根據(jù)運行調(diào)整進行調(diào)整。
- 另外的Queues是隊列的配置,此處提供了三個隊列,這里的隊列你可以根據(jù)自己的需要進行調(diào)整,值為隊列的優(yōu)先級。
- 此處需要注意的是,如果你指定了嚴(yán)格模式時,低優(yōu)先級的任務(wù),只有等到高優(yōu)先級的任務(wù)執(zhí)行完成后,才能執(zhí)行。如果高優(yōu)先級的隊列中一直有任務(wù),那么低優(yōu)先級的任務(wù)可能會得不到執(zhí)行的機會。默認的嚴(yán)格模式并沒有打開,如果你需要此模式,進需要提供
Config
結(jié)構(gòu)體中StrictPriority
屬性設(shè)置為true
. - 然后就是類似于http server中的路由設(shè)置,你可在這里指定需要的不同任務(wù)
- 最后啟動了一個asynq的Server
任務(wù)的執(zhí)行邏輯
package tasks import ( "context" "encoding/json" "fmt" "log" "time" "github.com/hibiken/asynq" ) // A list of task types. const ( TypeEmailDelivery = "email:deliver" TypeImageResize = "image:resize" ) type EmailDeliveryPayload struct { UserID int TemplateID string } type ImageResizePayload struct { SourceURL string } //---------------------------------------------- // Write a function NewXXXTask to create a task. // A task consists of a type and a payload. //---------------------------------------------- func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) { payload, err := json.Marshal(EmailDeliveryPayload{UserID: userID, TemplateID: tmplID}) if err != nil { return nil, err } return asynq.NewTask(TypeEmailDelivery, payload), nil } func NewImageResizeTask(src string) (*asynq.Task, error) { payload, err := json.Marshal(ImageResizePayload{SourceURL: src}) if err != nil { return nil, err } // task options can be passed to NewTask, which can be overridden at enqueue time. return asynq.NewTask(TypeImageResize, payload, asynq.MaxRetry(5), asynq.Timeout(20 * time.Minute)), nil } //--------------------------------------------------------------- // Write a function HandleXXXTask to handle the input task. // Note that it satisfies the asynq.HandlerFunc interface. // // Handler doesn't need to be a function. You can define a type // that satisfies asynq.Handler interface. See examples below. //--------------------------------------------------------------- func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error { var p EmailDeliveryPayload if err := json.Unmarshal(t.Payload(), &p); err != nil { return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID) // Email delivery code ... return nil } // ImageProcessor implements asynq.Handler interface. type ImageProcessor struct { // ... fields for struct } func (processor *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { var p ImageResizePayload if err := json.Unmarshal(t.Payload(), &p); err != nil { return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } log.Printf("Resizing image: src=%s", p.SourceURL) // Image resizing code ... return nil } func NewImageProcessor() *ImageProcessor { return &ImageProcessor{} }
上面的代碼定義了不同的任務(wù)類型,以及任務(wù)在調(diào)度過程中的Payload信息。這個payload是一個可以利用的點,在你需要實現(xiàn)更加高級的控制能力的時候,這里我們先簡單的看看如何使用的即可。
NewEmailDeliveryTask
和
NewImageResizeTask
其實就是定義了兩個不同的任務(wù)類型,在實現(xiàn)時,你可以指定不同的屬性,比如最大的重試次數(shù),以及單個任務(wù)的實例的超時時間等等。
接下來的兩段代碼,我摘出來著重說一下:
//--------------------------------------------------------------- // Write a function HandleXXXTask to handle the input task. // Note that it satisfies the asynq.HandlerFunc interface. // // Handler doesn't need to be a function. You can define a type // that satisfies asynq.Handler interface. See examples below. //--------------------------------------------------------------- func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error { var p EmailDeliveryPayload if err := json.Unmarshal(t.Payload(), &p); err != nil { return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID) // Email delivery code ... return nil } // ImageProcessor implements asynq.Handler interface. type ImageProcessor struct { // ... fields for struct } func (processor *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { var p ImageResizePayload if err := json.Unmarshal(t.Payload(), &p); err != nil { return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } log.Printf("Resizing image: src=%s", p.SourceURL) // Image resizing code ... return nil } func NewImageProcessor() *ImageProcessor { return &ImageProcessor{} }
這其實就是說明了如何將一個task對象轉(zhuǎn)換為可以在Server中進行路由注冊的工具方法。你可以把它當(dāng)做是http HandlerFunc
一樣對待,當(dāng)然了,你也可以使用第二種方式,自己定義一個結(jié)構(gòu)體,然后實現(xiàn)ProcessTask
方法.
Client如何讓任務(wù)進行調(diào)度的,三種不同場景下的使用方式
package main import ( "log" "time" "github.com/hibiken/asynq" "your/app/package/tasks" ) const redisAddr = "127.0.0.1:6379" func main() { client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr}) defer client.Close() // ------------------------------------------------------ // Example 1: Enqueue task to be processed immediately. // Use (*Client).Enqueue method. // ------------------------------------------------------ task, err := tasks.NewEmailDeliveryTask(42, "some:template:id") if err != nil { log.Fatalf("could not create task: %v", err) } info, err := client.Enqueue(task) if err != nil { log.Fatalf("could not enqueue task: %v", err) } log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue) // ------------------------------------------------------------ // Example 2: Schedule task to be processed in the future. // Use ProcessIn or ProcessAt option. // ------------------------------------------------------------ info, err = client.Enqueue(task, asynq.ProcessIn(24*time.Hour)) if err != nil { log.Fatalf("could not schedule task: %v", err) } log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue) // ---------------------------------------------------------------------------- // Example 3: Set other options to tune task processing behavior. // Options include MaxRetry, Queue, Timeout, Deadline, Unique etc. // ---------------------------------------------------------------------------- task, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg") if err != nil { log.Fatalf("could not create task: %v", err) } info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3 * time.Minute)) if err != nil { log.Fatalf("could not enqueue task: %v", err) } log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue) }
上面這部分代碼實現(xiàn)了三種場景下的使用方法,你可以立即調(diào)用,也可以在未來的某個時間點調(diào)用,還可以更加詳盡的控制任務(wù)執(zhí)行。
上面的代碼僅僅是關(guān)于asynq的簡單的一個介紹。在生產(chǎn)環(huán)境下,如何使用呢,一般情況下,我們會提供一個provider.provider來提供配置的源,源可以是文件,也可以是Mysql還可以是其他存儲源,最重要的是需要實現(xiàn)對應(yīng)的方法。下面一個文件源為例來說明如何實現(xiàn)一個源
// FileBasedConfigProvider implements asynq.PeriodicTaskConfigProvider interface. type FileBasedConfigProvider struct { ? ? ? ? filename string } type PeriodicTaskConfigContainer struct { ? ? ? ? Configs []*Config `yaml:"configs"` } type Config struct { ? ? ? ? Cronspec string `yaml:"cronspec"` ? ? ? ? TaskType string `yaml:"task_type"` } // Parses the yaml file and return a list of PeriodicTaskConfigs. func (p *FileBasedConfigProvider) GetConfigs() ([]*asynq.PeriodicTaskConfig, error) { ? ? ? ? data, err := os.ReadFile(p.filename) ? ? ? ? if err != nil { ? ? ? ? ? ? ? ? return nil, err ? ? ? ? } ? ? ? ? var c PeriodicTaskConfigContainer ? ? ? ? if err := yaml.Unmarshal(data, &c); err != nil { ? ? ? ? ? ? ? ? return nil, err ? ? ? ? } ? ? ? ? var configs []*asynq.PeriodicTaskConfig ? ? ? ? for _, cfg := range c.Configs { ? ? ? ? ? ? ? ? configs = append(configs, &asynq.PeriodicTaskConfig{Cronspec: cfg.Cronspec, Task: asynq.NewTask(cfg.TaskType, nil)}) ? ? ? ? } ? ? ? ? return configs, nil }
如何使用
provider := &FileBasedConfigProvider{filename: "./periodic_task_config.yml"} ? ? ? ? mgr, err := asynq.NewPeriodicTaskManager( ? ? ? ? ? ? ? ? asynq.PeriodicTaskManagerOpts{ ? ? ? ? ? ? ? ? ? ? ? ? RedisConnOpt: asynq.RedisClientOpt{ ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Addr: ? ? "127.0.0.1:6379", ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Password: "123456", ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? DB: ? ? ? 1, ? ? ? ? ? ? ? ? ? ? ? ? }, ? ? ? ? ? ? ? ? ? ? ? ? PeriodicTaskConfigProvider: provider, ? ? ? ? // this provider object is the interface to your config source ? ? ? ? ? ? ? ? ? ? ? ? SyncInterval: ? ? ? ? ? ? ? 10 * time.Second, // this field specifies how often sync should happen ? ? ? ? ? ? ? ? }) ? ? ? ? if err != nil { ? ? ? ? ? ? ? ? log.Fatal(err) ? ? ? ? } ? ? ? ? if err := mgr.Run(); err != nil { ? ? ? ? ? ? ? ? log.Fatal(err) ? ? ? ? }
在任務(wù)調(diào)度的邏輯中,我們一般會實現(xiàn)一個調(diào)度器來進行任務(wù)調(diào)度,而不是單個任務(wù)進行獨立的調(diào)度,例如:
? ? ? ? loc, err := time.LoadLocation("Asia/Shanghai") ? ? ? ? if err != nil { panic(err) } ? ? ? ? scheduler := asynq.NewScheduler( ? ? ? ? ? ? ? asynq.RedisClientOpt{ ? ? ? ? ? ? ? ? ? ? ? Addr: ? ? "127.0.0.1:6379", ? ? ? ? ? ? ? ? ? ? ? Password: "123456", ? ? ? ? ? ? ? ? ? ? ? DB: ? ? ? 1, ? ? ? ? ? ? ? }, ? ? ? ? ? ? ? &asynq.SchedulerOpts{ ? ? ? ? ? ? ? ? ? ? ? Location: loc, ? ? ? ? ? ? ? }, ? ? ? ? ) ? ? ? ? task := asynq.NewTask("example_task", nil) ? ? ? ? // You can use cron spec string to specify the schedule. ? ? ? ? entryID, err := scheduler.Register("*/1 * * * *", task) ? ? ? ? if err != nil { ? ? ? ? ? ? log.Fatal(err) ? ? ? ? } ? ? ? ? fmt.Println(entryID) ? ? ? ? if err := scheduler.Run(); err != nil { ? ? ? ? ? ? ? log.Fatal(err) ? ? ? ? }
該庫還提供了一個工具用于監(jiān)控任務(wù)的運行情況。Asynqmon是一個基于web的工具,用于監(jiān)控和管理Asynq隊列和任務(wù)。下面是Web UI的一些截圖
以上就是asynq的全部介紹,下面,看看我對任務(wù)系統(tǒng)的改動
首先,增加任務(wù)的注冊和發(fā)現(xiàn)
- 客戶端在啟動時,注冊節(jié)點信息到etcd,并且注冊該rpc服務(wù)器提供的所有遠程調(diào)用方法,供web頁面建立cron任務(wù)時,進行選擇執(zhí)行相應(yīng)的定時方法,如果采用任務(wù)對列不需要提供調(diào)用方法,框架自己提供
- 調(diào)度器定期拉取etcd注冊信息,維護web端和客戶端的rpc連接池,方便web端調(diào)度器進行rpc連接和rpc調(diào)用
訂閱任務(wù)取消的信號,對任務(wù)取消的信號進行處理,在頁面增加任務(wù)啟停的功能
pubsub := AsyncClient.Subscribe(context.Background(), "asynq:cancel") cancelCh := pubsub.Channel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() now := time.Now() ch1 := make(chan int) stop := make(chan bool) go func(ctx context.Context, xxx string) { err := xxxx if err != nil { ch1 <- 2 logx.Error(fmt.Sprintf("本次定時任務(wù)執(zhí)行失敗,失敗原因: %s", err)) return } for i := 0; i < 100; i++ { select { default: time.Sleep(1 * time.Second) logx.Info("我是任務(wù)主流程,我在運行中...") case <-stop: logx.Info("收到退出信號,任務(wù)結(jié)束...") return } } usedTime := time.Since(now) msg := fmt.Sprintf("本次定時任務(wù)成功結(jié)束執(zhí)行,用時: %f秒", usedTime.Seconds()) // Signal the goroutine has completed ch1 <- 1 }(ctx, in.Args) for { select { case cd := <-ch1: if cd == 1 { return &CommonReply{Message: "本次定時任務(wù)成功結(jié)束執(zhí)行"}, nil } else { return &CommonReply{Message: "本次定時任務(wù)執(zhí)行失敗"}, nil } case msg := <-cancelCh: if msg.Payload == in.TaskId { // stop <- true respMsg := fmt.Sprintf("任務(wù)[%s]已被取消.Exist.", msg.Payload) logx.Info(respMsg) return &CommonReply{Message: respMsg}, nil } } }
針對自身系統(tǒng)的功能需求,可以對asynq庫已有的功能進行更多的豐富和改進,同時,你也可以借鑒其他優(yōu)秀的庫,對現(xiàn)有的庫進行豐富和改造。
以上就是基于Golang設(shè)計一套可控的定時任務(wù)系統(tǒng)的詳細內(nèi)容,更多關(guān)于Golang定時任務(wù)系統(tǒng)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang基礎(chǔ)學(xué)習(xí)之map的示例詳解
哈希表是常見的數(shù)據(jù)結(jié)構(gòu),有的語言會將哈希稱作字典或者映射,在Go中,哈希就是常見的數(shù)據(jù)類型map,本文就來聊聊Golang中map的相關(guān)知識吧2023-03-03手把手教你如何在Goland中創(chuàng)建和運行項目
歡迎來到本指南!我們將手把手地教您在Goland中如何創(chuàng)建、配置并運行項目,通過簡單的步驟,您將迅速上手這款強大的集成開發(fā)環(huán)境(IDE),輕松實現(xiàn)您的編程夢想,讓我們一起開啟這段精彩的旅程吧!2024-02-02golang實現(xiàn)unicode轉(zhuǎn)換為字符串string的方法
這篇文章主要介紹了golang實現(xiàn)unicode轉(zhuǎn)換為字符串string的方法,實例分析了Go語言編碼轉(zhuǎn)換的相關(guān)技巧,具有一定參考借鑒價值,需要的朋友可以參考下2016-07-07Golang import本地包和導(dǎo)入問題相關(guān)詳解
這篇文章主要介紹了Golang import本地包和導(dǎo)入問題相關(guān)詳解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-02-02Go語言crypto包創(chuàng)建自己的密碼加密工具實現(xiàn)示例
Go語言借助它的簡單性和強大的標(biāo)準(zhǔn)庫,實現(xiàn)一個自己的密碼加密工具,本文將會結(jié)合代碼示例深入探討如何使用Go語言的crypto包來實現(xiàn)自己的加密工具2023-11-11