go-redis 连接池实现思想
本着 ”拿来“ 主义,写了这篇文章,主要是用来做记录日常工作,只是拿来别人的思想,提高自己处理问题的能力,和扩展自己解决问题和搬砖的思路,也希望可以给别人带来一些思路。
为什么会有redis连接池的概念
在传统的C/S模型中,客户端和服务端需要通信的话就要建立连接,在这里每次连接都需要花费不少的资源,例如常说的TCP三次握手等。随着网络和互联网应用越来越庞大和集中,单个请求对应单个链接的模式已经满足不了高并发的场景了,所以连接池的概念就出现了,基本上网络相关的连接池都是在客户端这边维护。
连接池则可以实现在客户端建立多个链接并且不释放,当需要使用链接的时候通过一定的算法获取已经建立的链接,使用完了以后则还给连接池,这就免去了TCP建立连接所带来的系统调用所消耗的时间。上面都是瞎BB,不想看的直接跳到下面看正文。
go-redis 连接池设计思路
github地址 https://github.com/go-redis/redis
go-redis也提供了三种对应服务端的客户端模式,集群,哨兵,和单机模式, 这里我们主要从连接池方面了解下,所以三种模式在连接池这一块都是公用的。
先列一下基本的代码
https://github.com/go-redis/redis/blob/master/internal/pool/pool.go
type Options struct {//建立连接函数Dialer func() (net.Conn, error)//在链接关闭的时候的回调OnClose func(*Conn) error//连接池中的链接的最大数量PoolSize int//最小的空闲链接的数量MinIdleConns int//单个链接的最大生命周期MaxConnAge time.Duration//从连接池获取链接的超时时间PoolTimeout time.Duration//链接空闲的超时时间IdleTimeout time.Duration//检查过期的链接的周期频率IdleCheckFrequency time.Duration}// 包装net.conn的type Conn struct {netConn net.Connrd *proto.ReaderrdLocked boolwr *proto.Writer//该链接是否初始化,比如如果需要执行命令之前需要执行的auth,select db 等的标识,代表已经auth,select过Inited bool// 是否放进连接池中pooled bool// 创建的时间,超过maxconnage的链接需要淘汰createdAt time.Time// 该链接执行命令的时候所记录的时间,就是上次用过这个链接的时间点usedAt atomic.Value}// 连接池的接口type Pooler interface {NewConn() (*Conn, error)CloseConn(*Conn) errorGet() (*Conn, error)Put(*Conn)Remove(*Conn, error)Len() intIdleLen() intStats() *StatsClose() error}// 实现上面Pooler接口的实体连接池type ConnPool struct {opt *OptionsdialErrorsNum uint32 // atomiclastDialErrorMu sync.RWMutexlastDialError error// 一个带容量(poolsize)的阻塞队列,queue chan struct{}// 锁connsMu sync.Mutex// 连接池中所有的链接conns []*Conn// 连接池中空闲的链接idleConns []*Conn// 池容量,如果没有可用的话要等待poolSize int// 连接池空闲链接的数量idleConnsLen int// 连接池的状态信息,业务可以获取到作为告警等用处stats Stats// 连接池是否关闭_closed uint32 // atomic}
一般的网络应用包中,Client包封装都是将 connpool直接封在Client结构里,用的时候直接GET,用完PUT
下面我们从两个方面去看下go-redis对连接池的思路
1. 一个是服务启动连接池初始化
// NewClient returns a client to the Redis Server specified by Options.func NewClient(opt *Options) *Client {opt.init()c := Client{baseClient: baseClient{opt: opt,connPool: newConnPool(opt),},}c.baseClient.init()c.init()return &c}func newConnPool(opt *Options) *pool.ConnPool {return pool.NewConnPool(&pool.Options{Dialer: opt.Dialer,PoolSize: opt.PoolSize,MinIdleConns: opt.MinIdleConns,MaxConnAge: opt.MaxConnAge,PoolTimeout: opt.PoolTimeout,IdleTimeout: opt.IdleTimeout,IdleCheckFrequency: opt.IdleCheckFrequency,})}// 服务启动的时候,redis客户端会在这里执行连接池的初始化func NewConnPool(opt *Options) *ConnPool {p := &ConnPool{opt: opt,// client获取链接之前,会向这个chan写入数据,如果能够写入说明不用等待就可以获取链接,否则需要等待其他地方从这个chan取走数据才可以获取链接,假如获取链接成功的话,在用完的时候,要向这个chan取出一个struct{}不然的话就会让别人一直阻塞,go-redis用这种方法保证池中的链接数量不会超过poolsizequeue: make(chan struct{}, opt.PoolSize),conns: make([]*Conn, 0, opt.PoolSize),idleConns: make([]*Conn, 0, opt.PoolSize),}for i := 0; i < opt.MinIdleConns; i++ {// 按照配置的最小空闲链接数,往池中add MinIdleConns个链接// 开启协程addIdleConn初始化链接的p.checkMinIdleConns()}if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {// 修复链接池,无限循环判断链接是否过期,过期的链接清理掉// 开启协程后台循环执行go p.reaper(opt.IdleCheckFrequency)}return p}// 往池中加新链接func (p *ConnPool) addIdleConn() {cn, err := p.newConn(true)if err != nil {return}// conns idleConns 是共享资源,需要加锁控制p.connsMu.Lock()p.conns = append(p.conns, cn)p.idleConns = append(p.idleConns, cn)p.connsMu.Unlock()}func NewConn(netConn net.Conn) *Conn {cn := &Conn{netConn: netConn,// 链接的出生时间点createdAt: time.Now(),}cn.rd = proto.NewReader(netConn)cn.wr = proto.NewWriter(netConn)// 链接上次使用的时间点cn.SetUsedAt(time.Now())return cn}// 修复链接池,无限循环判断链接是否过期,过期的链接清理掉func (p *ConnPool) reaper(frequency time.Duration) {ticker := time.NewTicker(frequency)defer ticker.Stop()for range ticker.C {if p.closed() {break}n, err := p.ReapStaleConns()if err != nil {internal.Logf("ReapStaleConns failed: %s", err)continue}atomic.AddUint32(&p.stats.StaleConns, uint32(n))}}// 无限循环判断链接是否过期func (p *ConnPool) ReapStaleConns() (int, error) {var n intfor {// 需要向queue chan写进数据才能往下执行,否则就会阻塞,等queue有容量p.getTurn()p.connsMu.Lock()cn := p.reapStaleConn()p.connsMu.Unlock()if cn != nil {p.removeConn(cn)}// 用完之后,就要从queue chan读取出你放进去的数据,让queue有容量写入p.freeTurn()if cn != nil {p.closeConn(cn)n++} else {break}}return n, nil}// 每次总idleConns的切片头部取出一个来判断是否过期,如果过期的话,更新idleConns,并且关闭过期链接func (p *ConnPool) reapStaleConn() *Conn {if len(p.idleConns) == 0 {return nil}cn := p.idleConns[0]if !p.isStaleConn(cn) {return nil}p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)p.idleConnsLen--return cn}// 根据链接的出生时间点和上次使用的时间点,判断该链接是否过期func (p *ConnPool) isStaleConn(cn *Conn) bool {if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {return false}now := time.Now()if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {return true}if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge {return true}return false}
至此go-redis连接池的初始化就完成了,主要工作有几个
1. 初始化配置的空闲链接数
2. 单独开个协程定时循环检查空闲链接池中的链接是否过期
2. 一个是redis-client执行命令之前获取链接和用完之后归还链接
func (c *baseClient) _getConn() (*pool.Conn, error) {cn, err := c.connPool.Get()if err != nil {return nil, err}//这里主要工作是当配置配了密码和DB的时候,这个链接之前命令之前要执行auth和select db命令err = c.initConn(cn)if err != nil {c.connPool.Remove(cn, err)if err := internal.Unwrap(err); err != nil {return nil, err}return nil, err}return cn, nil}// Get returns existed connection from the pool or creates a new one.func (p *ConnPool) Get() (*Conn, error) {if p.closed() {return nil, ErrClosed}// 如果能够写入说明不用等待就可以获取链接,否则需要等待其他地方从这个chan取走数据才可以获取链接,假如获取链接成功的话,在用完的时候,要向这个chan取出一个struct{}不然的话就会让别人一直阻塞(如果在pooltimeout时间内没有等待到,就会超时返回),go-redis用这种方法保证池中的链接数量不会超过poolsizeerr := p.waitTurn()if err != nil {return nil, err}for {// 共享资源,操作要加锁p.connsMu.Lock()cn := p.popIdle()p.connsMu.Unlock()if cn == nil {break}// 判断从空闲链接切片中拿出来的链接是否过期,兜底if p.isStaleConn(cn) {_ = p.CloseConn(cn)continue}// 命中统计atomic.AddUint32(&p.stats.Hits, 1)return cn, nil}// 如果没有空闲链接的话,就重新拨号建一个atomic.AddUint32(&p.stats.Misses, 1)newcn, err := p._NewConn(true)if err != nil {// 获取链接后,要释放掉开始往queue队列里面放的数据p.freeTurn()return nil, err}return newcn, nil}// queue这里的功能固定数量的令牌桶(获取conn链接的令牌),用之前拿,用完之后放回,不会增加令牌数量也不会减少。func (p *ConnPool) getTurn() {p.queue <- struct{}{}}// 等候获取queue中的令牌func (p *ConnPool) waitTurn() error {select {case p.queue <- struct{}{}:return nildefault:timer := timers.Get().(*time.Timer)timer.Reset(p.opt.PoolTimeout)select {case p.queue <- struct{}{}:if !timer.Stop() {<-timer.C}timers.Put(timer)return nilcase <-timer.C:timers.Put(timer)atomic.AddUint32(&p.stats.Timeouts, 1)return ErrPoolTimeout}}}// 放回令牌func (p *ConnPool) freeTurn() {<-p.queue}// 链接用完之后(获取服务端响应后),要放回Pool中,最后放回令牌// 一般链接用完之后都是放回空闲链接切片里func (p *ConnPool) Put(cn *Conn) {if !cn.pooled {p.Remove(cn, nil)return}p.connsMu.Lock()p.idleConns = append(p.idleConns, cn)p.idleConnsLen++p.connsMu.Unlock()p.freeTurn()}
至此go-redis的client执行命令的时候,主要做了下面3个工作
1. 从连接池中获取链接
2. 构造命令协议,往conn链接中写入命令协议
3. 获取redis服务端响应后,把链接放回连接池
go-redis 优秀的方面
1. 针对单个链接有最大的生存时间概念
2. 针对单个链接有距离上次活跃的时间,最大的空闲时间概念
3. 单独有协程会清理上面2种过期的链接,这样可以让pool链接都保持最新( go-mysql中的连接池还没有这个功能,mysql服务端那边会把超过一定空闲的连接断掉,这也算是go-redis的一个比较好的有点 )
4. queue chan conn,用这样一个固定容量的信道,可以把它比作一个固定容量的令牌桶,用来限制控制池中最大的链接数量
5. go-redis把字符串转成[]byte,用的指针转换的方式,减少不必要的对象资源的分配减少GC的数量。
// BytesToString converts byte slice to string.func BytesToString(b []byte) string {return *(*string)(unsafe.Pointer(&b))}// StringToBytes converts string to byte slice.func StringToBytes(s string) []byte {return *(*[]byte)(unsafe.Pointer(&struct {stringCap int}{s, len(s)},))}
go-redis 不足的地方
本着优点也是缺点的论点,我还是提个不足的点
虽然清理那些超过生命周期的链接是好的方面,但是还是可以从这里在优化一下,例如虽然这个链接超过了业务自定义的生存的生命周期,但是如果这个链接本身还是有效的,这样就不是浪费一个有效的链接,而且有必要的话还要创建一个新的链接,这样的话是不是有些浪费系统资源
单纯从不释放掉有效的链接方面的解决方案
对每个链接都定时去ping一下服务端,记录下记录,剔除掉已经断开的链接。(grpc维护的连接池是这样实现的)
至此,文章到这里也结束了,如有说的不对的地方,希望大家指出和交流。
推荐阅读
站长 polarisxu
自己的原创文章
不限于 Go 技术
职场和创业经验
Go语言中文网
每天为你
分享 Go 知识
Go爱好者值得关注
