go-zero數(shù)據(jù)的流處理利器fx使用詳解
數(shù)據(jù)的流處理利器
go-zero微服務(wù)庫(kù)地址https://github.com/tal-tech/go-zero
流處理(Stream processing)是一種計(jì)算機(jī)編程范式,其允許給定一個(gè)數(shù)據(jù)序列(流處理數(shù)據(jù)源),一系列數(shù)據(jù)操作(函數(shù))被應(yīng)用到流中的每個(gè)元素。同時(shí)流處理工具可以顯著提高程序員的開(kāi)發(fā)效率,允許他們編寫(xiě)有效、干凈和簡(jiǎn)潔的代碼。
流數(shù)據(jù)處理在我們的日常工作中非常常見(jiàn),舉個(gè)例子,我們?cè)跇I(yè)務(wù)開(kāi)發(fā)中往往會(huì)記錄許多業(yè)務(wù)日志,這些日志一般是先發(fā)送到Kafka,然后再由Job消費(fèi)Kafaka寫(xiě)到elasticsearch,在進(jìn)行日志流處理的過(guò)程中,往往還會(huì)對(duì)日志做一些處理,比如過(guò)濾無(wú)效的日志,做一些計(jì)算以及重新組合日志等等,示意圖如下:
流處理工具fx
gozero是一個(gè)功能完備的微服務(wù)框架,框架中內(nèi)置了很多非常實(shí)用的工具,其中就包含流數(shù)據(jù)處理工具fx,下面我們通過(guò)一個(gè)簡(jiǎn)單的例子來(lái)認(rèn)識(shí)下該工具:
package main import ( "fmt" "os" "os/signal" "syscall" "time" "github.com/tal-tech/go-zero/core/fx" ) func main() { ch := make(chan int) go inputStream(ch) go outputStream(ch) c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGTERM, syscall.SIGINT) <-c } func inputStream(ch chan int) { count := 0 for { ch <- count time.Sleep(time.Millisecond * 500) count++ } } func outputStream(ch chan int) { fx.From(func(source chan<- interface{}) { for c := range ch { source <- c } }).Walk(func(item interface{}, pipe chan<- interface{}) { count := item.(int) pipe <- count }).Filter(func(item interface{}) bool { itemInt := item.(int) if itemInt%2 == 0 { return true } return false }).ForEach(func(item interface{}) { fmt.Println(item) }) }
inputStream函數(shù)模擬了流數(shù)據(jù)的產(chǎn)生,outputStream函數(shù)模擬了流數(shù)據(jù)的處理過(guò)程,其中From函數(shù)為流的輸入,Walk函數(shù)并發(fā)的作用在每一個(gè)item上,F(xiàn)ilter函數(shù)對(duì)item進(jìn)行過(guò)濾為true保留為false不保留,F(xiàn)orEach函數(shù)遍歷輸出每一個(gè)item元素。
流數(shù)據(jù)處理中間操作
一個(gè)流的數(shù)據(jù)處理可能存在許多的中間操作,每個(gè)中間操作都可以作用在流上。就像流水線上的工人一樣,每個(gè)工人操作完零件后都會(huì)返回處理完成的新零件,同理流處理中間操作完成后也會(huì)返回一個(gè)新的流。
fx的流處理中間操作:
操作函數(shù) | 功能 | 輸入 |
---|---|---|
Distinct | 去除重復(fù)的item | KeyFunc,返回需要去重的key |
Filter | 過(guò)濾不滿足條件的item | FilterFunc,Option控制并發(fā)量 |
Group | 對(duì)item進(jìn)行分組 | KeyFunc,以key進(jìn)行分組 |
Head | 取出前n個(gè)item,返回新stream | int64保留數(shù)量 |
Map | 對(duì)象轉(zhuǎn)換 | MapFunc,Option控制并發(fā)量 |
Merge | 合并item到slice并生成新stream | |
Reverse | 反轉(zhuǎn)item | |
Sort | 對(duì)item進(jìn)行排序 | LessFunc實(shí)現(xiàn)排序算法 |
Tail | 與Head功能類(lèi)似,取出后n個(gè)item組成新stream | int64保留數(shù)量 |
Walk | 作用在每個(gè)item上 | WalkFunc,Option控制并發(fā)量 |
下圖展示了每個(gè)步驟和每個(gè)步驟的結(jié)果:
用法與原理分析
From
通過(guò)From函數(shù)構(gòu)建流并返回Stream,流數(shù)據(jù)通過(guò)channel進(jìn)行存儲(chǔ):
// 例子 s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0} fx.From(func(source chan<- interface{}) { for _, v := range s { source <- v } }) // 源碼 func From(generate GenerateFunc) Stream { source := make(chan interface{}) go func() { defer close(source) // 構(gòu)造流數(shù)據(jù)寫(xiě)入channel generate(source) }() return Range(source) }
Filter
Filter函數(shù)提供過(guò)濾item的功能,F(xiàn)ilterFunc定義過(guò)濾邏輯true保留item,false則不保留:
// 例子 保留偶數(shù) s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0} fx.From(func(source chan<- interface{}) { for _, v := range s { source <- v } }).Filter(func(item interface{}) bool { if item.(int)%2 == 0 { return true } return false }) // 源碼 func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream { return p.Walk(func(item interface{}, pipe chan<- interface{}) { // 執(zhí)行過(guò)濾函數(shù)true保留,false丟棄 if fn(item) { pipe <- item } }, opts...) }
Group
Group對(duì)流數(shù)據(jù)進(jìn)行分組,需定義分組的key,數(shù)據(jù)分組后以slice存入channel:
// 例子 按照首字符"g"或者"p"分組,沒(méi)有則分到另一組 ss := []string{"golang", "google", "php", "python", "java", "c++"} fx.From(func(source chan<- interface{}) { for _, s := range ss { source <- s } }).Group(func(item interface{}) interface{} { if strings.HasPrefix(item.(string), "g") { return "g" } else if strings.HasPrefix(item.(string), "p") { return "p" } return "" }).ForEach(func(item interface{}) { fmt.Println(item) }) } // 源碼 func (p Stream) Group(fn KeyFunc) Stream { // 定義分組存儲(chǔ)map groups := make(map[interface{}][]interface{}) for item := range p.source { // 用戶(hù)自定義分組key key := fn(item) // key相同分到一組 groups[key] = append(groups[key], item) } source := make(chan interface{}) go func() { for _, group := range groups { // 相同key的一組數(shù)據(jù)寫(xiě)入到channel source <- group } close(source) }() return Range(source) }
Reverse
reverse可以對(duì)流中元素進(jìn)行反轉(zhuǎn)處理:
// 例子 fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) { fmt.Println(item) }) // 源碼 func (p Stream) Reverse() Stream { var items []interface{} // 獲取流中數(shù)據(jù) for item := range p.source { items = append(items, item) } // 反轉(zhuǎn)算法 for i := len(items)/2 - 1; i >= 0; i-- { opp := len(items) - 1 - i items[i], items[opp] = items[opp], items[i] } // 寫(xiě)入流 return Just(items...) }
Distinct
distinct對(duì)流中元素進(jìn)行去重,去重在業(yè)務(wù)開(kāi)發(fā)中比較常用,經(jīng)常需要對(duì)用戶(hù)id等做去重操作:
// 例子 fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} { return item }).ForEach(func(item interface{}) { fmt.Println(item) }) // 結(jié)果為 1,2,3,4,5,6 // 源碼 func (p Stream) Distinct(fn KeyFunc) Stream { source := make(chan interface{}) threading.GoSafe(func() { defer close(source) // 通過(guò)key進(jìn)行去重,相同key只保留一個(gè) keys := make(map[interface{}]lang.PlaceholderType) for item := range p.source { key := fn(item) // key存在則不保留 if _, ok := keys[key]; !ok { source <- item keys[key] = lang.Placeholder } } }) return Range(source) }
Walk
Walk函數(shù)并發(fā)的作用在流中每一個(gè)item上,可以通過(guò)WithWorkers設(shè)置并發(fā)數(shù),默認(rèn)并發(fā)數(shù)為16,最小并發(fā)數(shù)為1,如設(shè)置unlimitedWorkers為true則并發(fā)數(shù)無(wú)限制,但并發(fā)寫(xiě)入流中的數(shù)據(jù)由defaultWorkers限制,WalkFunc中用戶(hù)可以自定義后續(xù)寫(xiě)入流中的元素,可以不寫(xiě)入也可以寫(xiě)入多個(gè)元素:
// 例子 fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) { newItem := strings.ToUpper(item.(string)) pipe <- newItem }).ForEach(func(item interface{}) { fmt.Println(item) }) // 源碼 func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream { pipe := make(chan interface{}, option.workers) go func() { var wg sync.WaitGroup pool := make(chan lang.PlaceholderType, option.workers) for { // 控制并發(fā)數(shù)量 pool <- lang.Placeholder item, ok := <-p.source if !ok { <-pool break } wg.Add(1) go func() { defer func() { wg.Done() <-pool }() // 作用在每個(gè)元素上 fn(item, pipe) }() } // 等待處理完成 wg.Wait() close(pipe) }() return Range(pipe) }
并發(fā)處理
fx工具除了進(jìn)行流數(shù)據(jù)處理以外還提供了函數(shù)并發(fā)功能,在微服務(wù)中實(shí)現(xiàn)某個(gè)功能往往需要依賴(lài)多個(gè)服務(wù),并發(fā)的處理依賴(lài)可以有效的降低依賴(lài)耗時(shí),提升服務(wù)的性能。
fx.Parallel(func() { userRPC() // 依賴(lài)1 }, func() { accountRPC() // 依賴(lài)2 }, func() { orderRPC() // 依賴(lài)3 })
注意fx.Parallel進(jìn)行依賴(lài)并行處理的時(shí)候不會(huì)有error返回,如需有error返回或者有一個(gè)依賴(lài)報(bào)錯(cuò)需要立馬結(jié)束依賴(lài)請(qǐng)求請(qǐng)使用MapReduce工具進(jìn)行處理。
總結(jié)
本篇文章介紹了流處理的基本概念和gozero中的流處理工具fx,在實(shí)際的生產(chǎn)中流處理場(chǎng)景應(yīng)用也非常多,希望本篇文章能給大家?guī)?lái)一定的啟發(fā),更好的應(yīng)對(duì)工作中的流處理場(chǎng)景。
以上就是go-zero數(shù)據(jù)的流處理利器fx使用詳解的詳細(xì)內(nèi)容,更多關(guān)于go-zero數(shù)據(jù)流處理fx的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
淺析Golang中類(lèi)型嵌入的簡(jiǎn)介與使用
類(lèi)型嵌入指的就是在一個(gè)類(lèi)型的定義中嵌入了其他類(lèi)型,Go?語(yǔ)言支持兩種類(lèi)型嵌入,分別是接口類(lèi)型的類(lèi)型嵌入和結(jié)構(gòu)體類(lèi)型的類(lèi)型嵌入,下面我們就來(lái)詳細(xì)一下類(lèi)型嵌入的使用吧2023-11-11golang sql語(yǔ)句超時(shí)控制方案及原理
一般應(yīng)用程序在執(zhí)行一條sql語(yǔ)句時(shí),都會(huì)給這條sql設(shè)置一個(gè)超時(shí)時(shí)間,本文主要介紹了golang sql語(yǔ)句超時(shí)控制方案及原理,具有一定的參考價(jià)值,感興趣的可以了解一下2023-12-12Golang比較兩個(gè)slice是否相等的問(wèn)題
本文主要介紹了Golang比較兩個(gè)slice是否相等的問(wèn)題,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03gin項(xiàng)目部署到服務(wù)器并后臺(tái)啟動(dòng)的步驟
本文主要介紹了gin項(xiàng)目部署到服務(wù)器并后臺(tái)啟動(dòng)的步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02