Java8为什么要新增StampedLock票据锁(邮戳锁)

共 28164字,需浏览 57分钟

 ·

2022-09-16 10:15

推荐阅读:微软全力拥抱 Java ! StampedLock是JUC并发包里面JDK1.8版本新增的一个锁,该锁提供了三种模式的读写控制,当调用获取锁的系列函数的时候,会返回一个long 型的变量,该变量被称为戳记(stamp),这个戳记代表了锁的状态。 try系列获取锁的函数,当获取锁失败后会返回为0的stamp值。当调用释放锁和转换锁的方法时候需要传入获取锁时候返回的stamp值。 StampedLockd的内部实现是基于CLH锁的,CLH锁原理:锁维护着一个等待线程队列,所有申请锁且失败的线程都记录在队列。一个节点代表一个线程,保存着一个标记位locked,用以判断当前线程是否已经释放锁。当一个线程试图获取锁时,从队列尾节点作为前序节点,循环判断所有的前序节点是否已经成功释放锁。

如下图所示:

b99eef0f8e838a7710196f0e319e5e9e.webp

我们首先看Stampedlock有哪些属性先,源码如下:

      private static final long serialVersionUID = -6001602636862214147L;

/** 获取服务器CPU核数 */
private static final int NCPU = Runtime.getRuntime().availableProcessors();

/** 线程入队列前自旋次数 */
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;

/** 队列头结点自旋获取锁最大失败次数后再次进入队列 */
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;

/** 重新阻塞前的最大重试次数 */
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;

/** The period for yielding when waiting for overflow spinlock */
private static final int OVERFLOW_YIELD_RATE = 7// must be power 2 - 1

/** 溢出之前用于阅读器计数的位数 */
private static final int LG_READERS = 7;

// 锁定状态和stamp操作的值
private static final long RUNIT = 1L// 每次获取读锁 进行+1
private static final long WBIT  = 1L << LG_READERS;  //写状态 1000 0000
private static final long RBITS = WBIT - 1L;  //溢出保护 0111 1111
private static final long RFULL = RBITS - 1L//最大读线程数 0111 1110
private static final long ABITS = RBITS | WBIT;   // 掩码 前8位都为1  1111 1111
private static final long SBITS = ~RBITS; // 掩码 1... 1000 0000

//锁state初始值,第9位为1,避免算术时和0冲突
private static final long ORIGIN = WBIT << 1;

// 来自取消获取方法的特殊值,因此调用者可以抛出IE
private static final long INTERRUPTED = 1L;

// WNode节点的status值
private static final int WAITING   = -1;
private static final int CANCELLED =  1;

// WNode节点的读写模式
private static final int RMODE = 0;
private static final int WMODE = 1;

/** Wait nodes */
static final class WNode {
    volatile WNode prev;
    volatile WNode next;
    volatile WNode cowait;    // 读模式使用该节点形成栈
    volatile Thread thread;   // non-null while possibly parked
    volatile int status;      // 0, WAITING, or CANCELLED
    final int mode;           // RMODE or WMODE
    WNode(int m, WNode p) { mode = m; prev = p; }
}

/** CLH队头节点 */
private transient volatile WNode whead;
/** CLH队尾节点 */
private transient volatile WNode wtail;

// views
transient ReadLockView readLockView;
transient WriteLockView writeLockView;
transient ReadWriteLockView readWriteLockView;

/** 锁队列状态, 当处于写模式时第8位为1,读模式时前7为为1-126(附加的readerOverflow用于当读者超过126时) */
private transient volatile long state;
/** 将state超过 RFULL=126的值放到readerOverflow字段中 */
private transient int readerOverflow;

StampedLockd源码中的WNote就是等待链表队列,每一个WNode标识一个等待线程,whead为CLH队列头,wtail为CLH队列尾,state为锁的状态。long型即64位,倒数第八位标识写锁状态,如果为1,标识写锁占用!下面围绕这个state来讲述锁操作。

常量标识

3种模式可以通过检查区分 (m = stamp & ABITS):
  • 写模式: m == WBIT
  • 乐观读模式: m == 0L (即使读锁已经被持有)
  • 悲观读模式: m > 0L && m <= RFULL (同步状态的拷贝,,但是stamp中的 * read hold count除了用来决定是哪个模式以外不会被使用)
这与状态编码略有不同: (state & ABITS) == 0L 表示锁没有被获取             (state & ABITS) == RBITS 这是一个特殊值,表示操作读者bit位的自旋锁溢出 其中ABITS和SBITS是作为掩码使用的,来快速检查当前锁的状态,在后面读写锁的获取中可以看到它们的使用。使用ORIGIN作为初始值也是与此相关,我们在后面讨论。而读状态正常最多只可以被获取126(RFULL)次,如果超出这个上限,那么其他读线程获取锁时需要在readreaderOverflow记录。因为readreaderOverflow不是个原子变量,所以为了保证它的同步性,需要进行同步处理。WBIT=1000 0000(即-128)
  • RBIT =0111 1111(即127)
  • SBIT =1000 0000(后7位表示当前正在读取的线程数量,清0)
StampedLock 给我们提供了3种读写模式的锁,如下:
  1. 写锁writeLock是一个独占锁,同时只有一个线程可以获取该锁,当一个线程获取该锁后,其他请求读锁和写锁的线程必须等待,这跟ReentrantReadWriteLock 的写锁很相似,不过要注意的是StampedLock的写锁是不可重入锁,
当目前没有线程持有读锁或者写锁的时候才可以获取到该锁,请求该锁成功后会返回一个stamp 票据变量来表示该锁的版本,如下源码所示:
      /**
 * 
 *获取写锁,获取失败会一直阻塞,直到获得锁成功
 * @return 可以用来解锁或转换模式的戳记(128的整数)
 */

public long writeLock() {
    long s, next;  
    return ((((s = state) & ABITS) == 0L &&   // 完全没有任何锁(没有读锁和写锁)的时候可以通过
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? //第8位置为1
            next : acquireWrite(false0L));
}
writeLock():典型的cas操作,如果STATE等于s,设置写锁位为1(s+WBIT)。acquireWrite跟acquireRead逻辑类似,先自旋尝试、加入等待队列、直至最终Unsafe.park()挂起线程。
      private long acquireWrite(boolean interruptible, long deadline) {
    WNode node = null, p;
    for (int spins = -1;;) { // 入队时自旋
        long m, s, ns;
        //无锁
        if ((m = (s = state) & ABITS) == 0L) {
            if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                return ns;
        }
        else if (spins < 0)
            //持有写锁,并且队列为空
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
        else if (spins > 0) {
            //恒成立
            if (LockSupport.nextSecondarySeed() >= 0)
                --spins;
        }
        else if ((p = wtail) == null) { 
            //初始化队列,写锁入队列
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        else if (node == null)
            //不为空,写锁入队列
            node = new WNode(WMODE, p);
        else if (node.prev != p)
            node.prev = p;
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
            p.next = node;
            break;//入队列成功退出循环
        }
    }

    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        //前驱节点为头节点
        if ((h = whead) == p) {
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            for (int k = spins;;) { // spin at head
                long s, ns;
                //无锁
                if (((s = state) & ABITS) == 0L) {
                    if (U.compareAndSwapLong(this, STATE, s,
                                             ns = s + WBIT)) {
                        //当前节点设置为头结点
                        whead = node;
                        node.prev = null;
                        return ns;
                    }
                }
                else if (LockSupport.nextSecondarySeed() >= 0 &&
                         --k <= 0)
                    break;
            }
        }
        else if (h != null) { // help release stale waiters
            WNode c; Thread w;
            //头结点为读锁将栈中所有读锁线程唤醒
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        //
        if (whead == h) {
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            else if ((ps = p.status) == 0)
                //前驱节点置为等待状态
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                long time; // 0 argument to park means no timeout
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                    whead == h && node.prev == p)
                    U.park(false, time);  // emulate LockSupport.park
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

并且StampedLock还提供了非阻塞tryWriteLock方法,源码如下:

      /**
 * 没有任何锁时则获取写锁,否则返回0
 *
 * @return 可以用来解锁或转换模式的戳记(128的整数),获取失败返回0
 */

public long tryWriteLock() {
    long s, next;
    return ((((s = state) & ABITS) == 0L &&
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
            next : 0L);
}

/**
 * unit时间内获得写锁成功返回状态值,失败返回0,或抛出InterruptedException
 * @return 0:获得锁失败
 * @throws InterruptedException 线程获得锁之前调用interrupt()方法抛出的异常
 */

public long tryWriteLock(long time, TimeUnit unit)
    throws InterruptedException 
{
    long nanos = unit.toNanos(time);
    if (!Thread.interrupted()) {
        long next, deadline;
        if ((next = tryWriteLock()) != 0L)
            //获得锁成功
            return next;
        if (nanos <= 0L)
            //超时返回0
            return 0L;
        if ((deadline = System.nanoTime() + nanos) == 0L)
            deadline = 1L;
        if ((next = acquireWrite(true, deadline)) != INTERRUPTED)
            //规定时间内获得锁结果
            return next;
    }
    throw new InterruptedException();
}

当释放该锁的时候需要调用unlockWrite方法并传递获取锁的时候的stamp参数。源码如下:

      /**
 * state匹配stamp则释放写锁,
 * @throws IllegalMonitorStateException  不匹配则抛出异常
 */

public void unlockWrite(long stamp) {
    WNode h;
    //state不匹配stamp  或者 没有写锁
    if (state != stamp || (stamp & WBIT) == 0L)
        throw new IllegalMonitorStateException();
    //state += WBIT, 第8位置为0,但state & SBITS 会循环,一共有4个值
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
    if ((h = whead) != null && h.status != 0)
        //唤醒继承者节点线程
        release(h);
}
unlockWrite():释放锁与加锁动作相反。将写标记位清零,如果state溢出,则退回到初始值;
  1. 「悲观锁readLock」,是个共享锁,在没有线程获取独占写锁的情况下,同时多个线程可以获取该锁;如果已经有线程持有写锁,其他线程请求获取该锁会被阻塞,这类似ReentrantReadWriteLock 的读锁(不同在于这里的读锁是不可重入锁)。
这里说的悲观是指在具体操作数据前,悲观的认为其他线程可能要对自己操作的数据进行修改,所以需要先对数据加锁,这是在读少写多的情况下的一种考虑,请求该锁成功后会返回一个stamp票据变量来表示该锁的版本,源码如下:
      /**
 *  悲观读锁,非独占锁,为获得锁一直处于阻塞状态,直到获得锁为止
 */

public long readLock() {
    long s = state, next;  
    // 队列为空   && 没有写锁同时读锁数小于126  && CAS修改状态成功      则状态加1并返回,否则自旋获取读锁
    return ((whead == wtail && (s & ABITS) < RFULL &&
             U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
            next : acquireRead(false0L));
}
乐观锁失败后锁升级为readLock():尝试state+1,用于统计读线程的数量,如果失败,进入acquireRead()进行自旋,通过CAS获取锁。 如果自旋失败,入CLH队列,然后再自旋,如果成功获得读锁,则激活cowait队列中的读线程Unsafe.unpark(),如果最终依然失败,则Unsafe().park()挂起当前线程。
      /**
 * @param interruptible 是否允许中断
 * @param 标识超时限时(0代表不限时),然后进入循环。
 * @return next state, or INTERRUPTED
 */

private long acquireRead(boolean interruptible, long deadline) {
    WNode node = null, p;
    //自旋
    for (int spins = -1;;) {
        WNode h;
        //判断队列为空
        if ((h = whead) == (p = wtail)) {
            //定义 long m,s,ns,并循环
            for (long m, s, ns;;) {
                //将state超过 RFULL=126的值放到readerOverflow字段中
                if ((m = (s = state) & ABITS) < RFULL ?
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                    //获取锁成功返回
                    return ns;
                //state高8位大于0,那么说明当前锁已经被写锁独占,那么我们尝试自旋  + 随机的方式来探测状态
                else if (m >= WBIT) {
                    if (spins > 0) {
                        if (LockSupport.nextSecondarySeed() >= 0)
                            --spins;
                    }
                    else {
                        if (spins == 0) {
                            WNode nh = whead, np = wtail;
                            //一直获取锁失败,或者有线程入队列了退出内循环自旋,后续进入队列
                            if ((nh == h && np == p) || (h = nh) != (p = np))
                                break;
                        }
                        //自旋 SPINS 次
                        spins = SPINS;
                    }
                }
            }
        }
        if (p == null) { 
            //初始队列
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        //当前节点为空则构建当前节点,模式为RMODE,前驱节点为p即尾节点。
        else if (node == null)
            node = new WNode(RMODE, p);
        //当前队列为空即只有一个节点(whead=wtail)或者当前尾节点的模式不是RMODE,那么我们会尝试在尾节点后面添加该节点作为尾节点,然后跳出外层循环
        else if (h == p || p.mode != RMODE) {
            if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                //入队列成功,退出自旋
                break;
            }
        }
        //队列不为空并且是RMODE模式, 添加该节点到尾节点的cowait链(实际上构成一个读线程stack)中
        else if (!U.compareAndSwapObject(p, WCOWAIT,
                                         node.cowait = p.cowait, node))
            //失败处理
            node.cowait = null;
        else {
            //通过CAS方法将该节点node添加至尾节点的cowait链中,node成为cowait中的顶元素,cowait构成了一个LIFO队列。
            //循环
            for (;;) {
                WNode pp, c; Thread w;
                //尝试unpark头元素(whead)的cowait中的第一个元素,假如是读锁会通过循环释放cowait链
                if ((h = whead) != null && (c = h.cowait) != null &&
                    U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null

                    U.unpark(w);
                //node所在的根节点p的前驱就是whead或者p已经是whead或者p的前驱为null
                if (h == (pp = p.prev) || h == p || pp == null) {
                    long m, s, ns;
                    do {
                        //根据state再次积极的尝试获取锁
                        if ((m = (s = state) & ABITS) < RFULL ?
                            U.compareAndSwapLong(this, STATE, s,
                                                 ns = s + RUNIT) :
                            (m < WBIT &&
                             (ns = tryIncReaderOverflow(s)) != 0L))
                            return ns;
                    } while (m < WBIT);//条件为读模式
                }
                if (whead == h && p.prev == pp) {
                    long time;
                    if (pp == null || h == p || p.status > 0) {
                        //这样做的原因是被其他线程闯入夺取了锁,或者p已经被取消
                        node = null// throw away
                        break;
                    }
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, p, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if ((h != pp || (state & ABITS) == WBIT) &&
                        whead == h && p.prev == pp)
                        U.park(false, time);
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    //出现的中断情况下取消当前节点的cancelWaiter操作
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, p, true);
                }
            }
        }
    }

    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        if ((h = whead) == p) {
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            for (int k = spins;;) { // spin at head
                long m, s, ns;
                if ((m = (s = state) & ABITS) < RFULL ?
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                    WNode c; Thread w;
                    whead = node;
                    node.prev = null;
                    while ((c = node.cowait) != null) {
                        if (U.compareAndSwapObject(node, WCOWAIT,
                                                   c, c.cowait) &&
                            (w = c.thread) != null)
                            U.unpark(w);
                    }
                    return ns;
                }
                else if (m >= WBIT &&
                         LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                    break;
            }
        }
        else if (h != null) {
            WNode c; Thread w;
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        if (whead == h) {
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            else if ((ps = p.status) == 0)
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                long time;
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                if (p.status < 0 &&
                    (p != h || (state & ABITS) == WBIT) &&
                    whead == h && node.prev == p)
                    U.park(false, time);
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

并且StampedLock还提供了非阻塞tryReadLock方法,源码如下:

      /**
* 可以立即获得锁,则获取读锁,否则返回0
*/

public long tryReadLock() {
    for (;;) {
        long s, m, next;
        //持有写锁返回0
        if ((m = (s = state) & ABITS) == WBIT)
            return 0L;
        //读线程数 < RFULL,CAS变更状态
        else if (m < RFULL) {
            if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
                return next;
        }
        //将state超过 RFULL的值放到readerOverflow字段
        else if ((next = tryIncReaderOverflow(s)) != 0L)
            return next;
    }
}

/**
 * unit时间内获得读锁成功返回状态值,失败返回0,或抛出InterruptedException
 */

public long tryReadLock(long time, TimeUnit unit)
    throws InterruptedException 
{
    long s, m, next, deadline;
    long nanos = unit.toNanos(time);
    if (!Thread.interrupted()) {
        if ((m = (s = state) & ABITS) != WBIT) {
            if (m < RFULL) {
                if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
                    return next;
            }
            else if ((next = tryIncReaderOverflow(s)) != 0L)
                return next;
        }
        if (nanos <= 0L)
            return 0L;
        if ((deadline = System.nanoTime() + nanos) == 0L)
            deadline = 1L;
        if ((next = acquireRead(true, deadline)) != INTERRUPTED)
            return next;
    }
    throw new InterruptedException();
}

StampedLock的悲观读锁readLock 当释放该锁时候需要 unlockRead 并传递参数 stamp。源码如下:

      /**
 * state匹配stamp则释放读锁,
 */

public void unlockRead(long stamp) {
    long s, m; WNode h;
    for (;;) {
        //不匹配抛出异常
        if (((s = state) & SBITS) != (stamp & SBITS) ||
            (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
            throw new IllegalMonitorStateException();
        //小于最大记录数值
        if (m < RFULL) {
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                break;
            }
        }
        //否则readerOverflow减一
        else if (tryDecReaderOverflow(s) != 0L)
            break;
    }
}
  1. 「乐观读锁 tryOptimisticRead」,是相对于悲观锁来说的,在操作数据前并没有通过 CAS 设置锁的状态,仅仅是通过位运算测试;如果当前没有线程持有写锁,则简单的返回一个非 0 的 stamp 版本信息,
获取该 stamp 后在具体操作数据前还需要调用 validate 验证下该 stamp 是否已经不可用,也就是看当调用 tryOptimisticRead 返回 stamp 后,到当前时间是否有其它线程持有了写锁,如果是那么 validate 会返回 0, 否者就可以使用该 stamp 版本的锁对数据进行操作。由于 tryOptimisticRead 并没有使用 CAS 设置锁状态,所以不需要显示的释放该锁。 该锁的一个特点是适用于读多写少的场景,因为获取读锁只是使用位操作进行检验,不涉及 CAS 操作,所以效率会高很多,但是同时由于没有使用真正的锁,在保证数据一致性上需要拷贝一份要操作的变量到方法栈,并且在操作数据时候可能其它写线程已经修改了数据, 而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的。源码如下:
      /**
 * 获取乐观读锁,返回邮票stamp
 */

public long tryOptimisticRead() {
    long s;  //有写锁返回0.   否则返回256
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

tryOptimisticRead():如果当前没有写锁占用,返回state(后7位清0,即清0读线程数),如果有写锁,返回0,即失败。

      /**
 * 验证从调用tryOptimisticRead开始到现在这段时间内有无写锁占用过锁资源,有写锁获得过锁资源则返回false. stamp为0返回false.
 * @return 从返回stamp开始,没有写锁获得过锁资源返回true,否则返回false
 */

public boolean validate(long stamp) {
    //强制读取操作和验证操作在一些情况下的内存排序问题
    U.loadFence();
    //当持有写锁后再释放写锁,该校验也不成立,返回false
    return (stamp & SBITS) == (state & SBITS);
}

「StamedLock还支持这三种锁在一定条件下进行相互转换,例如long tryConvertToWriteLock(long stamp)期望把stamp标示的锁升级为写锁,这个函数会在下面几种情况下返回一个有效的 stamp(也就是晋升写锁成功):」

  1. 当前锁已经是写锁模式了。

  2. 当前锁处于读锁模式,并且没有其他线程是读锁模式

  3. 当前处于乐观读模式,并且当前写锁可用。

源码如下:

      /**
 * state匹配stamp时, 执行下列操作之一. 
 *   1、stamp 已经持有写锁,直接返回.  
 *   2、读模式,但是没有更多的读取者,并返回一个写锁stamp.
 *   3、有一个乐观读锁,只在即时可用的前提下返回一个写锁stamp
 *   4、其他情况都返回0
 */

public long tryConvertToWriteLock(long stamp) {
    long a = stamp & ABITS, m, s, next;
    //state匹配stamp
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        //没有锁
        if ((m = s & ABITS) == 0L) {
            if (a != 0L)
                break;
            //CAS修改状态为持有写锁,并返回
            if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
                return next;
        }
        //持有写锁
        else if (m == WBIT) {
            if (a != m)
                //其他线程持有写锁
                break;
            //当前线程已经持有写锁
            return stamp;
        }
        //有一个读锁
        else if (m == RUNIT && a != 0L) {
            //释放读锁,并尝试持有写锁
            if (U.compareAndSwapLong(this, STATE, s,
                                     next = s - RUNIT + WBIT))
                return next;
        }
        else
            break;
    }
    return 0L;
}

/**
*   state匹配stamp时, 执行下列操作之一.
    1、stamp 表示持有写锁,释放写锁,并持有读锁
    2 stamp 表示持有读锁 ,返回该读锁
    3 有一个乐观读锁,只在即时可用的前提下返回一个读锁stamp
    4、其他情况都返回0,表示失败
 *
 */

public long tryConvertToReadLock(long stamp) {
    long a = stamp & ABITS, m, s, next; WNode h;
    //state匹配stamp
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        //没有锁
        if ((m = s & ABITS) == 0L) {
            if (a != 0L)
                break;
            else if (m < RFULL) {
                if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
                    return next;
            }
            else if ((next = tryIncReaderOverflow(s)) != 0L)
                return next;
        }
        //写锁
        else if (m == WBIT) {
            //非当前线程持有写锁
            if (a != m)
                break;
            //释放写锁持有读锁
            state = next = s + (WBIT + RUNIT);
            if ((h = whead) != null && h.status != 0)
                release(h);
            return next;
        }
        //持有读锁
        else if (a != 0L && a < WBIT)
            return stamp;
        else
            break;
    }
    return 0L;
}

校验这个戳是否有效validate():比较当前stamp和发生乐观锁得到的stamp比较,不一致则失败。

还有一个转换成乐观锁tryConvertToOptimisticRead(long stamp) ,这里就不讲了,道理都差不多。 另外 StampedLock 的读写锁都是不可重入锁,所以当获取锁后释放锁前,不应该再调用会获取锁的操作,以避免产生死锁。 当多个线程同时尝试获取读锁和写锁的时候,谁先获取锁没有一定的规则,完全都是尽力而为,是随机的,并且该锁不是直接实现 Lock 或 ReadWriteLock 接口,而是内部自己维护了一个双向阻塞队列。 「下面通过 JDK8 里面提供的一个管理二维点的例子讲解来加深对上面讲解的理解。代码如下所示:」
      package com.hjc;

import java.util.concurrent.locks.StampedLock;

/**
 * Created by cong on 2018/6/16.
 */

public class Point {

    // 成员变量
    private double x, y;

    // 锁实例
    private final StampedLock sl = new StampedLock();

    // 排它锁-写锁(writeLock)
    void move(double deltaX, double deltaY) {
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    // 乐观读锁(tryOptimisticRead)
    double distanceFromOrigin() {

        // 尝试获取乐观读锁(1)
        long stamp = sl.tryOptimisticRead();
        // 将全部变量拷贝到方法体栈内(2)
        double currentX = x, currentY = y;
        // 检查在(1)获取到读锁票据后,锁有没被其它写线程排它性抢占(3)
        if (!sl.validate(stamp)) {
            // 如果被抢占则获取一个共享读锁(悲观获取)(4)
            stamp = sl.readLock();
            try {
                // 将全部变量拷贝到方法体栈内(5)
                currentX = x;
                currentY = y;
            } finally {
                // 释放共享读锁(6)
                sl.unlockRead(stamp);
            }
        }
        // 返回计算结果(7)
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    // 使用悲观锁获取读锁,并尝试转换为写锁
    void moveIfAtOrigin(double newX, double newY) {
        // 这里可以使用乐观读锁替换(1)
        long stamp = sl.readLock();
        try {
            // 如果当前点在原点则移动(2)
            while (x == 0.0 && y == 0.0) {
                // 尝试将获取的读锁升级为写锁(3)
                long ws = sl.tryConvertToWriteLock(stamp);
                // 升级成功,则更新票据,并设置坐标值,然后退出循环(4)
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                } else {
                    // 读锁升级写锁失败则释放读锁,显示获取独占写锁,然后循环重试(5)
                    sl.unlockRead(stamp);
                    stamp = sl.writeLock();
                }
            }
        } finally {
            // 释放锁(6)
            sl.unlock(stamp);
        }
    }
    
}

如上代码 Point 类里面有两个成员变量(x,y) 来标示一个点的二维坐标,和三个操作坐标变量的方法,另外实例化了一个 StampedLock 对象用来保证操作的原子性。

首先分析下 move 方法,该函数作用是使用参数的增量值,改变当前 point 坐标的位置;代码先获取到了写锁,然后对 point 坐标进行修改,然后释放锁。该锁是排它锁,这保证了其它线程调用 move 函数时候会被阻塞,也保证了其它线程不能获取读锁,读取坐标的值,直到当前线程显示释放了写锁, 也就是保证了对变量 x,y 操作的原子性和数据一致性。 接下来再看 distanceFromOrigin 方法,该方法作用是计算当前位置到原点(坐标为 0,0)的距离,代码(1)首先尝试获取乐观读锁,如果当前没有其它线程获取到了写锁,那么(1)会返回一个非 0 的 stamp 用来表示版本信息,代码(2)拷贝坐标变量到本地方法栈里面。 代码(3)检查在(1)获取到的 stamp 值是否还有效,之所以还要在此校验是因为代码(1)获取读锁时候并没有通过 CAS 操作修改锁的状态,而是简单的通过与或操作返回了一个版本信息,这里校验是看在在获取版本信息到现在的时间段里面是否有其它线程持有了写锁,如果有则之前获取的版本信息就无效了。 这里如果校验成功则执行(7)使用本地方法栈里面的值进行计算然后返回。需要注意的是在代码(3) 校验成功后,代码(7)计算期间,其它线程可能获取到了写锁并且修改了 x,y 的值,而当前线程执行代码(7)进行计算时候采用的还是修改前值的拷贝,也就是操作的值是对之前值的一个拷贝,一个快照,并不是最新的值。 「也许我们会想,代码(2) 和(3)能否互换?。」 答案是明显不能的,如果位置换了,那么首先执行validate ,假设验证通过了,要拷贝x,y 值到本地方法栈,而在拷贝的过程中很有可能其他线程已经修改过了 x,y 中的一个,这就造成了数据的不一致性了。 「那么你可能还会这样会想,即使不交换代码 (2) 和(3),在拷贝 x,y 值到本地方法栈里面时,也会存在其他线程修改了x,y中的一个值,这不也会存在问题吗?」 这个确实会存在,但是别忘记了拷贝后还有一道validate,如果这时候有线程修改了x,y 中的值,那么肯定是有线程在调用 validate 前,调用 sl.tryOptimisticRead 后获取了写锁,那么进行 validate 时候就会失败。 好了知道这么多原理后,我们就会惊叹这也是乐观读设计的精妙之处也是使用时候容易出问题的地方。下面继续分析 validate 失败后会执行代码(4)获取悲观读锁,如果这时候其他线程持有写锁则代码(4)会导致的当前线程阻塞直到其它线程释放了写锁。 如果这时候没有其他线程获取到写锁,那么当前线程就可以获取到读锁,然后执行代码(5)重新拷贝新的坐标值到本地方法栈,然后就是代码(6)释放了锁,拷贝的时候由于加了读锁,所以拷贝期间其它线程获取写锁时候会被阻塞, 这保证了数据的一致性,另外这里 x,y 没有被声明为 volatie,会不会存在内存不可见性问题那?答案是不会,因为加锁的语义保证了内存可见性, 最后代码(7)使用方法栈里面数据计算返回,同理这里在计算时候使用的数据也可能不是最新的,其它写线程可能已经修改过原来的 x,y 值了。 最后一个方法 moveIfAtOrigin 作用是如果当前坐标为原点则移动到指定的位置。代码(1)获取悲观读锁,保证其它线程不能获取写锁修改 x,y 值,然后代码(2)判断如果当前点在原点则更新坐标, 代码(3) 尝试升级读锁为写锁,这里升级不一定成功,因为多个线程都可以同时获取悲观读锁,当多个线程都执行到(3)时候只有一个可以升级成功,升级成功则返回非 0 的 stamp,否非返回 0。 这里假设当前线程升级成功,然后执行步骤(4)更新 stamp 值和坐标值,然后退出循环。如果升级失败则执行步骤(5)首先释放读锁然后申请写锁,获取到写锁后在循环重新设置坐标值。最后步骤(6) 释放锁。 「使用乐观读锁还是很容易犯错误的,必须要严谨,必须要保证如下的使用顺序,用伪代码作为讲解,如下:」
      long stamp = lock.tryOptimisticRead(); //非阻塞获取版本信息
copyVaraibale2ThreadMemory();//拷贝变量到线程本地堆栈
if(!lock.validate(stamp)){ // 校验
    long stamp = lock.readLock();//获取读锁
    try {
        copyVaraibale2ThreadMemory();//拷贝变量到线程本地堆栈
     } finally {
       lock.unlock(stamp);//释放悲观锁
    }

}

useThreadMemoryVarables();//使用线程本地堆栈里面的数据进行操作
「总结:」StampedLock 提供的读写锁与 ReentrantReadWriteLock 类似,只是前者的都是不可重入锁。但是前者通过提供乐观读锁在多线程多读的情况下提供更好的性能,这是因为获取乐观读锁时候不需要进行 CAS 操作设置锁的状态,而只是简单的测试状态。

来源:cnblogs.com/huangjuncong/p/9191760.htm

浏览 18
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报