亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

探索Golang?Redis實(shí)現(xiàn)發(fā)布訂閱功能實(shí)例

 更新時(shí)間:2024年01月24日 09:26:17   作者:紹納?nullbody筆記  
這篇文章主要介紹了Golang?Redis發(fā)布訂閱功能實(shí)例探索,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

引言

用11篇文章實(shí)現(xiàn)一個(gè)可用的Redis服務(wù),姑且叫EasyRedis吧,希望通過(guò)文章將Redis掰開撕碎了呈現(xiàn)給大家,而不是僅僅停留在八股文的層面,并且有非常爽的感覺(jué),歡迎持續(xù)關(guān)注學(xué)習(xí)。

[x] easyredis之TCP服務(wù)

[x] easyredis之網(wǎng)絡(luò)請(qǐng)求序列化協(xié)議(RESP)

[x] easyredis之內(nèi)存數(shù)據(jù)庫(kù)

[x] easyredis之過(guò)期時(shí)間 (時(shí)間輪實(shí)現(xiàn))

[x] easyredis之持久化 (AOF實(shí)現(xiàn))

[x] easyredis之發(fā)布訂閱功能

[ ] easyredis之有序集合(跳表實(shí)現(xiàn))

[ ] easyredis之 pipeline 客戶端實(shí)現(xiàn)

[ ] easyredis之事務(wù)(原子性/回滾)

[ ] easyredis之連接池

[ ] easyredis之分布式集群存儲(chǔ)

EasyRedis之發(fā)布訂閱

代碼路徑: pubhub/pubhub.go這個(gè)代碼很簡(jiǎn)單,總共就200行

發(fā)布訂閱的基本原理:客戶端A/B/C訂閱通道,客戶端D往通道中發(fā)送消息后,客戶端A/B/C可以接收到通道中的消息

效果演示:

底層實(shí)現(xiàn)的數(shù)據(jù)結(jié)構(gòu)采用map + list,map中的key表示channel,value則用list來(lái)存儲(chǔ)同一個(gè)channel下的多個(gè)客戶端clientN

type Pubhub struct {

	// 自定義實(shí)現(xiàn)的map
	dataDict dict.ConcurrentDict

	// 該鎖的顆粒度太大
	//locker sync.RWMutex

	locker *locker.Locker // 自定義一個(gè)分布鎖
}
  • dataDict就是我們自己實(shí)現(xiàn)的map

  • locker 用來(lái)對(duì)操作同一個(gè)鏈表的不同客戶端加鎖,避免并發(fā)問(wèn)題

訂閱Subscribe

  • 獲取客戶端發(fā)送來(lái)的通道名

  • 加鎖(鎖的原理看文章最后)

  • 遍歷通道,獲取該通道下的客戶端鏈表

  • 將當(dāng)前的客戶端加入到鏈表中即可(前提:沒(méi)有訂閱過(guò))

// SUBSCRIBE channel [channel ...]
func (p *Pubhub) Subscribe(c abstract.Connection, args [][]byte) protocol.Reply {
	iflen(args) < 1 {
		return protocol.NewArgNumErrReply("subscribe")
	}
	// 通道名
	keys := make([]string, 0, len(args))
	for _, arg := range args {
		keys = append(keys, string(arg))
	}
	// 加鎖
	p.locker.Locks(keys...)
	defer p.locker.Unlocks(keys...)
	for _, arg := range args {
		chanName := string(arg)
		// 記錄當(dāng)前客戶端連接訂閱的通道
		c.Subscribe(chanName)
		// 雙向鏈表,記錄通道下的客戶端連接
		var l *list.LinkedList
		raw, exist := p.dataDict.Get(chanName)
		if !exist { // 說(shuō)明該channel第一次使用
			l = list.NewLinkedList()
			p.dataDict.Put(chanName, l)
		} else {
			l, _ = raw.(*list.LinkedList)
		}
		// 未訂閱
		if !l.Contain(func(actual interface{}) bool {
			return c == actual
		}) {
			// 如果不重復(fù),那就記錄訂閱
			logger.Debug("subscribe channel [" + chanName + "] success")
			l.Add(c)
		}
		// 回復(fù)客戶端消息
		_, err := c.Write(channelMsg(_subscribe, chanName, c.SubCount()))
		if err != nil {
			logger.Warn(err)
		}
	}
	return protocol.NewNoReply()
}

取消訂閱 Unsubscribe

  • 獲取通道名(如果沒(méi)有指定,就是取消當(dāng)前客戶端的所有通道)

  • 加鎖(鎖的原理看文章最后)

  • 獲取該通道下的客戶端鏈表

  • 從鏈表中刪除當(dāng)前的客戶端

// 取消訂閱
// unsubscribes itself from all the channels using the UNSUBSCRIBE command without additional arguments
func (p *Pubhub) Unsubscribe(c abstract.Connection, args [][]byte) protocol.Reply {
	var channels []string
	iflen(args) < 1 { // 取消全部
		channels = c.GetChannels()
	} else { // 取消指定channel
		channels = make([]string, len(args))
		for i, v := range args {
			channels[i] = string(v)
		}
	}
	p.locker.Locks(channels...)
	defer p.locker.Unlocks(channels...)
	// 說(shuō)明已經(jīng)沒(méi)有訂閱的通道
	iflen(channels) == 0 {
		c.Write(noChannelMsg())
	}
	for _, channel := range channels {
		// 從客戶端中刪除當(dāng)前通道
		c.Unsubscribe(channel)
		// 獲取鏈表
		raw, ok := p.dataDict.Get(channel)
		if ok {
			// 從鏈表中刪除當(dāng)前客戶端
			l, _ := raw.(*list.LinkedList)
			l.DelAllByVal(func(actual interface{}) bool {
				return c == actual
			})
			// 如果鏈表為空,清理map
			if l.Len() == 0 {
				p.dataDict.Delete(channel)
			}
		}
		c.Write(channelMsg(_unsubscribe, channel, c.SubCount()))
	}
	return protocol.NewNoReply()
}

發(fā)布 publish

  • 獲取客戶端的channel

  • 從map將channel作為key得到客戶端鏈表

  • 對(duì)鏈表的所有客戶端發(fā)送數(shù)據(jù)即可

func (p *Pubhub) Publish(self abstract.Connection, args [][]byte) protocol.Reply {
	iflen(args) != 2 {
		return protocol.NewArgNumErrReply("publish")
	}
	channelName := string(args[0])
	// 加鎖
	p.locker.Locks(channelName)
	defer p.locker.Unlocks(channelName)
	raw, ok := p.dataDict.Get(channelName)
	if ok {
		var sendSuccess int64
		var failedClient = make(map[interface{}]struct{})
		// 取出鏈表
		l, _ := raw.(*list.LinkedList)
		// 遍歷鏈表
		l.ForEach(func(i int, val interface{}) bool {
			conn, _ := val.(abstract.Connection)
			if conn.IsClosed() {
				failedClient[val] = struct{}{}
				returntrue
			}
			if val == self { //不給自己發(fā)送
				returntrue
			}
			// 發(fā)送數(shù)據(jù)
			conn.Write(publisMsg(channelName, string(args[1])))
			sendSuccess++
			returntrue
		})
		// 剔除客戶端
		iflen(failedClient) > 0 {
			removed := l.DelAllByVal(func(actual interface{}) bool {
				_, ok := failedClient[actual]
				return ok
			})
			logger.Debugf("del %d closed client", removed)
		}
		// 返回發(fā)送的客戶端數(shù)量
		return protocol.NewIntegerReply(sendSuccess)
	}
	// 如果channel不存在
	return protocol.NewIntegerReply(0)
}

鎖的原理

代碼路徑 tool/locker/locker.go

type Pubhub struct {
	// 自定義實(shí)現(xiàn)的map
	dataDict dict.ConcurrentDict
	// 該鎖的顆粒度太大
	//locker sync.RWMutex
	locker *locker.Locker // 自定義一個(gè)分布鎖
}

在結(jié)構(gòu)體中,當(dāng)有【多個(gè)客戶端同時(shí)訂閱不同的通道】,通過(guò)通道名,可以獲取到不同的客戶端鏈表,也就是不同的客戶端操作不同的鏈表可以并行操作(只有操作同一個(gè)鏈表才是互斥),如果我們使用 locker sync.RWMutex 鎖,那就是所有的客戶端持有同一把鎖,一個(gè)客戶端只有操作完成一個(gè)鏈表,才能允許另一個(gè)客戶端操作另外一個(gè)鏈表,整個(gè)操作只能是串行的。所以我們需要實(shí)現(xiàn)一個(gè)顆粒度更小的鎖

通過(guò)不同的通道名,加不同的鎖即可(盡可能的減小鎖的粒度),同時(shí)為了避免死鎖,并行的協(xié)程加鎖的順序要一致。所以代碼中有個(gè)排序。

這里做了一個(gè)技巧,通過(guò)hash將通道名映射成不同的hash值,再通過(guò)取余,將鎖固定在一個(gè)范圍內(nèi)(將無(wú)限多的channel名 轉(zhuǎn)成 有限范圍的值),所以可能存在不同的通道名取余的結(jié)果,用的同一個(gè)鎖

type Locker struct {
	mu []*sync.RWMutex
	mask uint32
}
// 順序加鎖(互斥)
func (l *Locker) Locks(keys ...string) {
	indexs := l.toLockIndex(keys...)
	for _, index := range indexs {
		mu := l.mu[index]
		mu.Lock()
	}
}
// 順序解鎖(互斥)
func (l *Locker) Unlocks(keys ...string) {
	indexs := l.toLockIndex(keys...)
	for _, index := range indexs {
		mu := l.mu[index]
		mu.Unlock()
	}
}
func (l *Locker) toLockIndex(keys ...string) []uint32 {
	// 將key轉(zhuǎn)成 切片索引[0,mask]
	mapIndex := make(map[uint32]struct{}) // 去重
	for _, key := range keys {
		mapIndex[l.spread(utils.Fnv32(key))] = struct{}{}
	}
	indices := make([]uint32, 0, len(mapIndex))
	for k := range mapIndex {
		indices = append(indices, k)
	}
	// 對(duì)索引排序
	sort.Slice(indices, func(i, j int) bool {
		return indices[i] < indices[j]
	})
	return indices
}

總結(jié)

鎖相關(guān)的代碼很有實(shí)踐意義,建議大家自己的手動(dòng)敲一下,平時(shí)工作中作為自己的代碼小組件使用,絕對(duì)可以讓人眼前一亮。

項(xiàng)目代碼地址: https://github.com/gofish2020/easyredis

以上就是探索Golang Redis發(fā)布訂閱功能實(shí)例的詳細(xì)內(nèi)容,更多關(guān)于Golang Redis發(fā)布訂閱的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Go語(yǔ)言常用條件判斷空值邏輯的使用

    Go語(yǔ)言常用條件判斷空值邏輯的使用

    本文主要介紹了Go語(yǔ)言常用條件判斷空值邏輯的使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • Go結(jié)構(gòu)體指針引發(fā)的值傳遞思考分析

    Go結(jié)構(gòu)體指針引發(fā)的值傳遞思考分析

    這篇文章主要為大家介紹了Go結(jié)構(gòu)體指針引發(fā)的值傳遞思考分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-12-12
  • Go語(yǔ)言為什么很少使用數(shù)組原理解析

    Go語(yǔ)言為什么很少使用數(shù)組原理解析

    這篇文章主要為大家介紹了Go語(yǔ)言為什么很少使用數(shù)組原理解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-12-12
  • go語(yǔ)言使用io和bufio包進(jìn)行流操作示例詳解

    go語(yǔ)言使用io和bufio包進(jìn)行流操作示例詳解

    這篇文章主要為大家介紹了go語(yǔ)言使用io和bufio包進(jìn)行流操作示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-08-08
  • 一文帶你了解Go語(yǔ)言如何解析JSON

    一文帶你了解Go語(yǔ)言如何解析JSON

    本文將說(shuō)明如何利用?Go?語(yǔ)言將?JSON?解析為結(jié)構(gòu)體和數(shù)組,如果解析?JSON?的嵌入對(duì)象,如何將?JSON?的自定義屬性名稱映射到結(jié)構(gòu)體,如何解析非結(jié)構(gòu)化的?JSON?字符串
    2023-01-01
  • Go語(yǔ)言JSON編解碼神器之marshal的運(yùn)用

    Go語(yǔ)言JSON編解碼神器之marshal的運(yùn)用

    這篇文章主要為大家詳細(xì)介紹了Go語(yǔ)言中JSON編解碼神器——marshal的運(yùn)用,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2023-09-09
  • Go語(yǔ)言異步API設(shè)計(jì)的扇入扇出模式詳解

    Go語(yǔ)言異步API設(shè)計(jì)的扇入扇出模式詳解

    這篇文章主要為大家介紹了Go語(yǔ)言異步API設(shè)計(jì)的扇入扇出模式示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-08-08
  • Go語(yǔ)言編譯時(shí)為exe添加圖標(biāo)和屬性信息的方法

    Go語(yǔ)言編譯時(shí)為exe添加圖標(biāo)和屬性信息的方法

    在使用Go語(yǔ)言開發(fā)應(yīng)用程序時(shí),有個(gè)非常方便的地方就是編譯得到的可執(zhí)行文件可以不依賴任何動(dòng)態(tài)鏈接庫(kù)、并且不需要任何運(yùn)行環(huán)境即可運(yùn)行,本文給大家介紹Go編譯時(shí)為exe添加圖標(biāo)和屬性信息的方法,需要的朋友可以參考下
    2023-09-09
  • golang實(shí)現(xiàn)大文件讀取的代碼示例

    golang實(shí)現(xiàn)大文件讀取的代碼示例

    在實(shí)際工作,我們需要讀取大數(shù)據(jù)文件,文件可能上G百G,所以我們不可能一次性的讀取到內(nèi)存,接下來(lái)本文給大家介紹了golang實(shí)現(xiàn)大文件讀取的示例,需要的朋友可以參考下
    2024-04-04
  • Go語(yǔ)言中append函數(shù)用法分析

    Go語(yǔ)言中append函數(shù)用法分析

    這篇文章主要介紹了Go語(yǔ)言中append函數(shù)用法,對(duì)比使用append函數(shù)與不使用append函數(shù)的兩個(gè)實(shí)例,詳細(xì)分析了Go語(yǔ)言中append函數(shù)的功能,需要的朋友可以參考下
    2015-02-02

最新評(píng)論