亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

關(guān)于redigo中PubSub的一點(diǎn)小坑分析

 更新時(shí)間:2019年01月03日 10:41:02   作者:Chen Jiehua  
這篇文章主要給大家介紹了關(guān)于redigo中PubSub的一點(diǎn)小坑的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

前言

最近在用 golang 做一些 redis 相關(guān)的操作,選用了 redigo 這個(gè)第三方庫。然后在使用 Pub/Sub 的時(shí)候,卻發(fā)現(xiàn)了一個(gè)小坑……

Redis Client

首先,我們來初始化一個(gè)帶連接池的 Redis Client:

import (
	"github.com/gomodule/redigo/redis"
)

type RedisClient struct {
	pool *redis.Pool
}

func NewRedisClient(addr string, db int, passwd string) *RedisClient {
	pool := &redis.Pool{
		MaxIdle:  10,
		IdleTimeout: 300 * time.Second,
		Dial: func() (redis.Conn, error) {
			c, err := redis.Dial("tcp", addr, redis.DialPassword(passwd), redis.DialDatabase(db))
			if err != nil {
				return nil, err
			}
			return c, nil
		},
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			if time.Since(t) < time.Minute {
				return nil
			}
			_, err := c.Do("PING")
			return err
		},
	}
	log.Printf("new redis pool at %s", addr)
	client := &RedisClient{
		pool: pool,
	}
	return client
}

Publish

然后我們可以簡單的實(shí)現(xiàn)一個(gè) publish 方法:

func (r *RedisClient) Publish(channel, message string) (int, error) {
	c := r.pool.Get()
	defer c.Close()
	n, err := redis.Int(c.Do("PUBLISH", channel, message))
	if err != nil {
		return 0, fmt.Errorf("redis publish %s %s, err: %v", channel, message, err)
	}
	return n, nil
}

Subscribe

接下來就是一個(gè)稍微復(fù)雜點(diǎn)的帶有心跳的 subscribe 方法:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
	psc := redis.PubSubConn{Conn: r.pool.Get()}
	defer psc.Close()
	log.Printf("redis pubsub subscribe channel: %v", channel)
	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
		return err
	}
	done := make(chan error, 1)
	// start a new goroutine to receive message
	go func() {
		for {
			switch msg := psc.Receive().(type) {
			case error:
				done <- fmt.Errorf("redis pubsub receive err: %v", msg)
				return
			case redis.Message:
				if err := consume(msg); err != nil {
					done <- err
					return
				}
			case redis.Subscription:
				if msg.Count == 0 {
					// all channels are unsubscribed
					done <- nil
					return
				}
			}
		}
	}()

	// health check
	tick := time.NewTicker(time.Minute)
	defer tick.Stop()
	for {
		select {
		case <-ctx.Done():
			if err := psc.Unsubscribe(); err != nil {
				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
			}
			return nil
		case err := <-done:
			return err
		case <-tick.C:
			if err := psc.Ping(""); err != nil {
				return err
			}
		}
	}

	return nil
}

最后,我們寫一個(gè)簡單地 main 函數(shù)來調(diào)用 publish & subscribe:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
	psc := redis.PubSubConn{Conn: r.pool.Get()}
	defer psc.Close()
	log.Printf("redis pubsub subscribe channel: %v", channel)
	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
		return err
	}
	done := make(chan error, 1)
	// start a new goroutine to receive message
	go func() {
		for {
			switch msg := psc.Receive().(type) {
			case error:
				done <- fmt.Errorf("redis pubsub receive err: %v", msg)
				return
			case redis.Message:
				if err := consume(msg); err != nil {
					done <- err
					return
				}
			case redis.Subscription:
				if msg.Count == 0 {
					// all channels are unsubscribed
					done <- nil
					return
				}
			}
		}
	}()

	// health check
	tick := time.NewTicker(time.Minute)
	defer tick.Stop()
	for {
		select {
		case <-ctx.Done():
			if err := psc.Unsubscribe(); err != nil {
				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
			}
			return nil
		case err := <-done:
			return err
		case <-tick.C:
			if err := psc.Ping(""); err != nil {
				return err
			}
		}
	}

	return nil
}


咋一看之下,好像并沒有什么異常?然而,如果我們這時(shí)候去看 redis 的 tcp 連接,就可以發(fā)現(xiàn)一些貓膩:

$sudo netstat -antp | grep redis
tcp  0  0 0.0.0.0:6379   0.0.0.0:*    LISTEN  940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55010  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55015  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55009  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55005  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55012  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55011  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55013  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55007  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55006  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55014  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:54972  ESTABLISHED 940/redis-server 0. 

竟然是每一次 subscribe 就新建了一個(gè)連接,而 connection pool 似乎沒有什么作用。

更進(jìn)一步地調(diào)試,我們發(fā)現(xiàn)在 defer psc.Close() 的時(shí)候就卡住了,也就是上面的 10 個(gè) goroutine 其實(shí)并沒有正常退出。

Concurrent

排查許久之后,終于定位到了問題!引用 redigo 的說明

Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method.

For full concurrent access to Redis, use the thread-safe Pool to get, use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph.

也就是說,雖然一個(gè)連接可以在不同的 goroutine 并發(fā)調(diào)用 Receive() 和 Subscribe()(subscribe調(diào)用了send和flush) ,但是卻不能再有其他并發(fā)操作(比如 Close())。

其他相似的問題還可以參考 issue

Fix

知道了上面的原因之后,我們稍微修改一下 defer psc.Close() 的位置即可解決問題:

	// start a new goroutine to receive message
	go func() {
		// IMPORTANT!
		defer psc.Close()
		for {
			switch msg := psc.Receive().(type) {
			case error:

總結(jié)

以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問大家可以留言交流,謝謝大家對(duì)腳本之家的支持。

相關(guān)文章

  • 淺談Redis中bind的坑

    淺談Redis中bind的坑

    本文主要介紹了淺談Redis中bind的坑,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-07-07
  • Redis?如何清空所有數(shù)據(jù)

    Redis?如何清空所有數(shù)據(jù)

    這篇文章主要介紹了Redis?如何清空所有數(shù)據(jù),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-08-08
  • 虛擬機(jī)下的Redis無法訪問報(bào)錯(cuò)500解決方法

    虛擬機(jī)下的Redis無法訪問報(bào)錯(cuò)500解決方法

    這篇文章主要介紹了虛擬機(jī)下的Redis無法訪問,報(bào)錯(cuò)500解決方法,由于我的redis是在虛擬機(jī)下安裝的,無法訪問redis的原因是因?yàn)樘摂M機(jī)的ip地址和主機(jī)不同,文中通過圖文結(jié)合給出了詳細(xì)的解決方法,需要的朋友可以參考下
    2024-02-02
  • Redis持久化深入詳解

    Redis持久化深入詳解

    這篇文章主要介紹了Redis持久化深入詳解,講解的還是比較詳細(xì)的,有感興趣的同學(xué)可以學(xué)習(xí)下
    2021-03-03
  • 在Centos?8.0中安裝Redis服務(wù)器的教程詳解

    在Centos?8.0中安裝Redis服務(wù)器的教程詳解

    由于考慮到linux服務(wù)器的性能,所以經(jīng)常需要把一些中間件安裝在linux服務(wù)上,今天通過本文給大家介紹下在Centos?8.0中安裝Redis服務(wù)器的詳細(xì)過程,感興趣的朋友一起看看吧
    2022-03-03
  • redis過期回調(diào)坑的解決

    redis過期回調(diào)坑的解決

    Redis提供了一種過期回調(diào)的機(jī)制,可以在某個(gè)鍵過期時(shí)觸發(fā)一個(gè)回調(diào)函數(shù),然而,在實(shí)際使用中,我們往往會(huì)遇到一些災(zāi)難性的問題,其中一個(gè)就是在使用過期回調(diào)的時(shí)候,我們可能會(huì)遭遇到無法預(yù)料的錯(cuò)誤,本文就詳細(xì)的介紹一下
    2023-09-09
  • Springboot/Springcloud項(xiàng)目集成redis進(jìn)行存取的過程解析

    Springboot/Springcloud項(xiàng)目集成redis進(jìn)行存取的過程解析

    大家都知道Redis支持五種數(shù)據(jù)類型:string(字符串),hash(哈希),list(列表),set(集合),zset(sorted set:有序集合),本文重點(diǎn)給大家介紹Springboot/Springcloud項(xiàng)目集成redis進(jìn)行存取的過程,需要的朋友參考下吧
    2021-12-12
  • Redis?HyperLogLog數(shù)據(jù)統(tǒng)計(jì)輕量級(jí)解決方案詳解

    Redis?HyperLogLog數(shù)據(jù)統(tǒng)計(jì)輕量級(jí)解決方案詳解

    這篇文章主要為大家介紹了Redis?HyperLogLog數(shù)據(jù)統(tǒng)計(jì)輕量級(jí)解決方案詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-12-12
  • redis哈希類型_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

    redis哈希類型_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

    這篇文章主要介紹了redis哈希類型的常用方法及原理淺析,感興趣的朋友一起看看吧
    2017-08-08
  • Redis特殊數(shù)據(jù)類型HyperLogLog基數(shù)統(tǒng)計(jì)算法講解

    Redis特殊數(shù)據(jù)類型HyperLogLog基數(shù)統(tǒng)計(jì)算法講解

    這篇文章主要為大家介紹了Redis特殊數(shù)據(jù)類型HyperLogLog基數(shù)統(tǒng)計(jì)算法講解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-05-05

最新評(píng)論