JuiceFS 源码阅读-中

共 39431字,需浏览 79分钟

 ·

2021-08-07 09:09

JuiceFS锁的实现

JuiceFS的锁实现,目前同时实现了BSD locks(对应Flock)和POSIX locks(对应Setlk)。细节上最大区别就是BSD locks只能以FD为最小控制单位(简单理解为单文件加锁,锁定的是文件描述符fd对应的文件),而POSIX locks可以在一个文件中以文件的offset+length的方式进行加锁(按文件内容进行范围加锁)。

BSD locks

//pkg/vfs/vfs_unix.go

func Flock(ctx Context, ino Ino, fh uint64, owner uint64, typ uint32, block bool) (err syscall.Errno) {
    var name string
    var reqid uint32
    defer func() { logit(ctx, "flock (%d,%d,%016X,%s,%t): %s", reqid, ino, owner, name, block, strerr(err)) }()
    switch typ {
    case syscall.F_RDLCK:
        name = "LOCKSH"
    case syscall.F_WRLCK:
        name = "LOCKEX"
    case syscall.F_UNLCK:
        name = "UNLOCK"
    default:
        err = syscall.EINVAL
        return
    }

    if IsSpecialNode(ino) {
        err = syscall.EPERM
        return
    }
    h := findHandle(ino, fh)
    if h == nil {
        err = syscall.EBADF
        return
    }
    h.addOp(ctx)
    defer h.removeOp(ctx)
    err = m.Flock(ctx, ino, owner, typ, block) //核心在元数据部分的控制,具体参考下面部分代码注释
    if err == 0 {
        h.Lock()
        if typ == syscall.F_UNLCK {
            h.locks &= 2
        } else {
            h.locks |= 1
            h.flockOwner = owner
        }
        h.Unlock()
    }
    return
}
//pkg/meta/sql_unix.go
func (m *dbMeta) Flock(ctx Context, inode Ino, owner uint64, ltype uint32, block bool) syscall.Errno {
    if ltype == syscall.F_UNLCK {//如果为解锁操作,则只需要删除对应的db记录即可
        return errno(m.txn(func(s *xorm.Session) error {
            _, err := s.Delete(&flock{Inode: inode, Owner: owner, Sid: m.sid})
            return err
        }))
    }
    var err syscall.Errno
    //循环处理加锁请求,分为阻塞(block=true)和非阻塞两种类型操作
    for {
        err = errno(m.txn(func(s *xorm.Session) error {
        //获取inode信息,避免锁指向的对象不存在,成为空锁。
            if exists, err := s.Get(&node{Inode: inode}); err != nil || !exists {
                if err == nil && !exists {
                    err = syscall.ENOENT
                }
                return err
            }
            //查询inode关联的全部锁信息
            rows, err := s.Rows(&flock{Inode: inode})
            if err != nil {
                return err
            }
            type key struct {
                sid uint64
                o   uint64
            }
            var locks = make(map[key]flock)
            var l flock
            for rows.Next() {
                if rows.Scan(&l) == nil {
                //执行迭代,将查询结果临时保存到locks数据结构中
                    locks[key{l.Sid, l.Owner}] = l
                }
            }
            rows.Close()
//判断需要加锁的类型是否为读锁,如果已经有写锁则加锁失败
            if ltype == syscall.F_RDLCK {
                for _, l := range locks {
                    if l.Ltype == 'W' {
                        return syscall.EAGAIN
                    }
                }
                //没有写锁冲突,则通过insert记录加上读锁
                return mustInsert(s, flock{Inode: inode, Owner: owner, Ltype: 'R', Sid: m.sid})
            }

            //加写入锁逻辑:先判断是否已经有写入锁(判断locks中是否有重复键值),如果有则更新锁的记录,否则直接insert插入对应的记录
            me := key{m.sid, owner}
            _, ok := locks[me]
            delete(locks, me)
            if len(locks) > 0 {
                return syscall.EAGAIN
            }
            if ok {
                _, err = s.Cols("Ltype").Update(&flock{Ltype: 'W'}, &flock{Inode: inode, Owner: owner, Sid: m.sid})
            } else {
                err = mustInsert(s, flock{Inode: inode, Owner: owner, Ltype: 'W', Sid: m.sid})
            }
            return err
        }))
    //非阻塞or报错直接返回结果
        if !block || err != syscall.EAGAIN {
            break
        }
        //阻塞情况下加写锁,则等待固定时长再进行下一轮加锁操作
        if ltype == syscall.F_WRLCK {
            time.Sleep(time.Millisecond * 1)
        } else {
            time.Sleep(time.Millisecond * 10)
        }
        if ctx.Canceled() {
            return syscall.EINTR
        }
    }
    return err
}

POSIX locks

按pid进行范围加锁,实现起来相对比较复杂,核心算法在updateLocks中实现。

func (m *dbMeta) Setlk(ctx Context, inode Ino, owner_ uint64, block bool, ltype uint32, start, end uint64, pid uint32) syscall.Errno {
    var err syscall.Errno
    lock := plockRecord{ltype, pid, start, end}//以pid为粒度,所以适合单机多进/线程模型,跨节点不太合适
    owner := int64(owner_)
    for {
        err = errno(m.txn(func(s *xorm.Session) error {
            if exists, err := s.Get(&node{Inode: inode}); err != nil || !exists {
                if err == nil && !exists {
                    err = syscall.ENOENT
                }
                return err
            }
            //unlock操作
            if ltype == F_UNLCK {
            //sid代表session ID,每个客户端的数据库连接都有一个独立的ID实例
                var l = plock{Inode: inode, Owner: owner, Sid: m.sid}
                ok, err := m.engine.Get(&l) //按inode、owner、sid三个字段组合,查询锁列表
                if err != nil {
                    return errno(err)
                }
                if !ok {
                    return nil
                }
                ls := loadLocks([]byte(l.Records)) //解析锁列表信息
                if len(ls) == 0 {
                    return nil
                }
                ls = updateLocks(ls, lock) //在已有所列表里面新增锁记录,有点复杂,之后详细介绍
                if len(ls) == 0 {
                    _, err = s.Delete(&plock{Inode: inode, Owner: owner, Sid: m.sid}) //删除锁记录
                } else {
                    _, err = s.Cols("records").Update(plock{Records: dumpLocks(ls)}, l) //更新已有所记录
                }
                return err
            }
            //以inode为关键字,查找已有的锁列表
            rows, err := s.Rows(&plock{Inode: inode})
            if err != nil {
                return errno(err)
            }
            type key struct {
                sid   uint64
                owner int64
            }
            var locks = make(map[key][]byte)
            var l plock
            //按查询结果构建锁map
            for rows.Next() {
                if rows.Scan(&l) == nil {
                    locks[key{l.Sid, l.Owner}] = dup(l.Records)
                }
            }
            rows.Close()
            //遍历map,判断是否有冲突锁
            lkey := key{m.sid, owner}
            for k, d := range locks {
                if k == lkey {
                    continue
                }
                ls := loadLocks([]byte(d))
                for _, l := range ls {
                    // find conflicted locks
                    if (ltype == F_WRLCK || l.ltype == F_WRLCK) && end > l.start && start < l.end {
                        return syscall.EAGAIN
                    }
                }
            }
            ls := updateLocks(loadLocks([]byte(locks[lkey])), lock) //更新锁列表信息
            var n int64
            //保存锁列表记录到DB
            if len(locks[lkey]) > 0 {
                n, err = s.Cols("records").Update(plock{Records: dumpLocks(ls)},
                    &plock{Inode: inode, Sid: m.sid, Owner: owner})
            } else {
                n, err = s.InsertOne(&plock{Inode: inode, Sid: m.sid, Owner: owner, Records: dumpLocks(ls)})
            }
            if err == nil && n == 0 {
                err = fmt.Errorf("insert/update failed")
            }
            return err
        }))
    //如果加锁失败且不进行阻塞,则直接返回结果
        if !block || err != syscall.EAGAIN {
            break
        }
        //加锁失败,阻塞,进入下一轮操作
        if ltype == F_WRLCK {
            time.Sleep(time.Millisecond * 1)
        } else {
            time.Sleep(time.Millisecond * 10)
        }
        if ctx.Canceled() {
            return syscall.EINTR
        }
    }
    return err
}

updateLocks 的代码逻辑如下,通过加上debug输出,更加容易观察其中细节

const (
    F_UNLCK = syscall.F_UNLCK
    F_RDLCK = syscall.F_RDLCK
    F_WRLCK = syscall.F_WRLCK
)

type plockRecord struct {
    ltype uint32
    pid   uint32
    start uint64
    end   uint64
}


func insertLocks(ls []plockRecord, i int, nl plockRecord) []plockRecord {
    //fmt.Println(i,"insertLocks before ls=",ls)
    nls := make([]plockRecord, len(ls)+1)
    copy(nls[:i], ls[:i])
    nls[i] = nl
    copy(nls[i+1:], ls[i:])
    ls = nls
    //fmt.Println(i,"insertLocks after ls=",ls)
    return ls
}

func updateLocks(ls []plockRecord, nl plockRecord) []plockRecord {
    // ls is ordered by l.start without overlap
    var i int
    for i < len(ls) && nl.end > nl.start {
        l := ls[i]
        if l.end < nl.start {
            fmt.Println("新增锁设定的区域超过当前锁范围,查找下一个")
        } else if l.start < nl.start {
            //fmt.Println("l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
            fmt.Println("1. 当前锁包含部分新锁区域,拆分成两个锁,调整当前锁范围从[",ls[i].start,"->",ls[i].end,"]调整为[",ls[i].start,"->",nl.start,"],并在当前位置之后插入新锁 [",nl.start,"->",l.end,"]")
            //fmt.Println("1-> l.start=",l.start," < nl.start=",nl.start,"l.end=",l.end," < nl.end=",nl.end )
            ls = insertLocks(ls, i+1, plockRecord{nl.ltype, nl.pid, nl.start, l.end})
            ls[i].end = nl.start
            i++
            nl.start = l.end
        } else if l.end < nl.end {
            //fmt.Println("2-> l.end < nl.end","nl.start=",nl.start,"ls[i].start=",ls[i].start)
            fmt.Println("2. 当前锁区间属于新锁区间,缩小当前锁范围,从[",ls[i].start,"->",ls[i].end,"]调整为[",nl.start,"->",ls[i].end,"]")
            ls[i].ltype = nl.ltype
            ls[i].start = nl.start
            nl.start = l.end
        } else if l.start < nl.end {
            //fmt.Println("3. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end=",nl.end)
            ls = insertLocks(ls, i, nl)
            fmt.Println("3. 新锁与当前锁有部分内容重叠,需要在当前位置插入新锁=[",nl.start,nl.end,"],并调整下一个锁的起始位置从[",ls[i+1].start,ls[i+1].end,"] -> [",nl.end,ls[i+1].end,"]")
            ls[i+1].start = nl.end
            nl.start = nl.end
        } else {
            fmt.Println("4. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
            fmt.Println("4. 新锁右侧和当前锁没有重叠(l.start>nl.end),仅需要在当前位置插入新锁=[",nl.start,nl.end,"]")
            ls = insertLocks(ls, i, nl)
            nl.start = nl.end
        }
        i++
    }

    if nl.start < nl.end {
        ls = append(ls, nl)
        fmt.Println("5. 仍然有部分尾部内容没有,补充末尾部分的锁内容,补充后=",ls)
    }

    i = 0
    //再次遍历锁列表,进行无效内容删除or区间合并操作。
    for i < len(ls) {
        if ls[i].ltype == F_UNLCK || ls[i].start == ls[i].end {
            // remove empty one
            //fmt.Println("删除锁列表从i=",i,"位置的内容,删除前ls=",ls)
            //fmt.Println("删除锁列表从i=",i,"位置的内容",ls[i:i+1])
            copy(ls[i:], ls[i+1:]) //从i位置开始左移1个单位
            ls = ls[:len(ls)-1] //删除末尾
            fmt.Println("6-1. 删除锁列表从i=",i,"位置的内容,删除后ls=",ls)
        } else {
            if i+1 < len(ls) && ls[i].ltype == ls[i+1].ltype && ls[i].end == ls[i+1].start {
                fmt.Println("6-2. 锁类型相同,且首尾相接,进行区间合并操作",ls)
                // combine continuous range
                ls[i].end = ls[i+1].end
                ls[i+1].start = ls[i+1].end
                //fmt.Println("锁类型相同,且首尾相接,进行区间合并操作2",ls)
            }
            i++
        }
    }
    return ls
}

小结

整个POSIX locks的算法主要是通过遍历已有的锁列表ls(数组结构),并按照一定规则进行新增锁记录的插入(简单理解为滑动窗口查找),其中nl代表窗口滑动范围锁,l代表当前已经有的锁。

-w900

根据代码注释,大概分为4种类型的锁处理。其中lodlock代表已有的锁记录(对应l),Newlock是新增锁的记录(对应nl)

类型1:

-w881

类型2:

-w899

类型3:

-w899

类型4:

-w899

发现问题

发现一个pid同步的bug,当新锁的内容覆盖旧锁时,并未更新对应的pid记录,导致加锁虽然成功,但是锁的pid还是指向旧的pid内容。复现代码如下

package main

import (
    "fmt"
    "syscall"
)

const (
    F_UNLCK = syscall.F_UNLCK
    F_RDLCK = syscall.F_RDLCK
    F_WRLCK = syscall.F_WRLCK
)

type plockRecord struct {
    ltype uint32
    pid   uint32
    start uint64
    end   uint64
}


func insertLocks(ls []plockRecord, i int, nl plockRecord) []plockRecord {
    //fmt.Println(i,"insertLocks before ls=",ls)
    nls := make([]plockRecord, len(ls)+1)
    copy(nls[:i], ls[:i])
    nls[i] = nl
    copy(nls[i+1:], ls[i:])
    ls = nls
    //fmt.Println(i,"insertLocks after ls=",ls)
    return ls
}

func updateLocks(ls []plockRecord, nl plockRecord) []plockRecord {
    // ls is ordered by l.start without overlap
    var i int
    for i < len(ls) && nl.end > nl.start {
        l := ls[i]
        if l.end < nl.start {
            fmt.Println("新增锁设定的区域超过当前锁范围,查找下一个")
        } else if l.start < nl.start {
            //fmt.Println("l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
            fmt.Println("1. 当前锁包含部分新锁区域,拆分成两个锁,调整当前锁范围从[",ls[i].start,"->",ls[i].end,"]调整为[",ls[i].start,"->",nl.start,"],并在当前位置之后插入新锁 [",nl.start,"->",l.end,"]")
            //fmt.Println("1-> l.start=",l.start," < nl.start=",nl.start,"l.end=",l.end," < nl.end=",nl.end )
            ls = insertLocks(ls, i+1, plockRecord{nl.ltype, nl.pid, nl.start, l.end})
            ls[i].end = nl.start
            i++
            nl.start = l.end
        } else if l.end < nl.end {
            //fmt.Println("2-> l.end < nl.end","nl.start=",nl.start,"ls[i].start=",ls[i].start)
            fmt.Println("2. 当前锁区间属于新锁区间,缩小当前锁范围,从[",ls[i].start,"->",ls[i].end,"]调整为[",nl.start,"->",ls[i].end,"]")
            ls[i].ltype = nl.ltype
            ls[i].start = nl.start
            //if ls[i].pid != nl.pid { //patch
            //  ls[i].pid = nl.pid
            //}
            nl.start = l.end
        } else if l.start < nl.end {
            //fmt.Println("3. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end=",nl.end)
            ls = insertLocks(ls, i, nl)
            fmt.Println("3. 新锁与当前锁有部分内容重叠,需要在当前位置插入新锁=[",nl.start,nl.end,"],并调整下一个锁的起始位置从[",ls[i+1].start,ls[i+1].end,"] -> [",nl.end,ls[i+1].end,"]")
            ls[i+1].start = nl.end
            nl.start = nl.end
        } else {
            fmt.Println("4. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
            fmt.Println("4. 新锁右侧和当前锁没有重叠(l.start>nl.end),仅需要在当前位置插入新锁=[",nl.start,nl.end,"]")
            ls = insertLocks(ls, i, nl)
            nl.start = nl.end
        }
        i++
    }

    if nl.start < nl.end {
        ls = append(ls, nl)
        fmt.Println("5. 仍然有部分尾部内容没有,补充末尾部分的锁内容,补充后=",ls)
    }

    i = 0
    //再次遍历锁列表,进行无效内容删除or区间合并操作。
    for i < len(ls) {
        if ls[i].ltype == F_UNLCK || ls[i].start == ls[i].end {
            // remove empty one
            //fmt.Println("删除锁列表从i=",i,"位置的内容,删除前ls=",ls)
            //fmt.Println("删除锁列表从i=",i,"位置的内容",ls[i:i+1])
            copy(ls[i:], ls[i+1:]) //从i位置开始左移1个单位
            ls = ls[:len(ls)-1] //删除末尾
            fmt.Println("6-1. 删除锁列表从i=",i,"位置的内容,删除后ls=",ls)
        } else {
            if i+1 < len(ls) && ls[i].ltype == ls[i+1].ltype && ls[i].end == ls[i+1].start {
                fmt.Println("6-2. 锁类型相同,且首尾相接,进行区间合并操作",ls)
                // combine continuous range
                ls[i].end = ls[i+1].end
                ls[i+1].start = ls[i+1].end
                //fmt.Println("锁类型相同,且首尾相接,进行区间合并操作2",ls)
            }
            i++
        }
    }
    return ls
}


func Setlk( ltype uint32, start, end uint64, pid uint32) {

    lock := plockRecord{ltype, pid, start, end}
    //ls := []plockRecord{plockRecord{F_WRLCK, pid, 04},{F_WRLCK, pid, 710},{F_WRLCK, pid, 1316}}
    ls := []plockRecord{plockRecord{F_WRLCK, 10004},{F_WRLCK, 102710}}
    //ls := []plockRecord{plockRecord{F_WRLCK, pid, 14}}
    fmt.Println("before updateLocks=",ls)
    ls = updateLocks(ls, lock)
    fmt.Println("after updateLocks=",ls)
}

func main(){
    Setlk(F_WRLCK,6,13,103) //理论上加锁以后的记录应该对应pid=103,上面的patch已经修复这个问题
}

输出内容如下:

before updateLocks= [{3 100 0 4} {3 102 7 10}]
新增锁设定的区域超过当前锁范围,查找下一个
2. 当前锁区间属于新锁区间,缩小当前锁范围,从[ 7 -> 10 ]调整为[ 6 -> 10 ]
5. 仍然有部分尾部内容没有,补充末尾部分的锁内容,补充后= [{3 100 0 4} {3 102 6 10} {3 103 10 13}]
6-2. 锁类型相同,且首尾相接,进行区间合并操作 [{3 100 0 4} {3 102 6 10} {3 103 10 13}]
6-1. 删除锁列表从i= 2 位置的内容,删除后ls= [{3 100 0 4} {3 102 6 13}]
after updateLocks= [{3 100 0 4} {3 102 6 13}] //理论上这里的pid=102是对应的是旧锁内容,应该被新增加的锁记录pid=103覆盖


浏览 60
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报