Disruptor高性能之道-无锁化

分布式朝闻道

共 3153字,需浏览 7分钟

 ·

2022-03-06 05:00

JUC中的队列BlockingQueue是通过加锁实现对生产者和消费者的协调。

加锁就意味着需要牺牲高性能,换来线程安全。

有没有办法既能高性能,还能线程安全?

Disruptor给出的答案是,“无锁”。

无锁,并不是完全消除锁,而是指没有OS层面的锁。

Disruptor通过CAS(Compare And Swap)指令实现了无锁化。具体的指令是cmpxchg,本文会做简单讲解。感兴趣的读者可以自行搜索资料了解详细内容。

简单解释下CAS具体干了什么事情。

CAS, 比较并交换,Compare And Swap。顾名思义,就是通过比较值是否发生变化,决定是否要重新赋值。如果在操作期间,值没有被其他线程操作,那么就将期望的值赋值给它,否则发现期望的值与旧值不等,说明值已经变更,则不执行操作,返回操作失败。

举个栗子

比如说,旧值oldValue为1,期望的值expectValue为1,新值newValue为2。如果没有其他线程修改旧值,那么

  • expectValue == oldValue
  • 将newValue写入,当前值为2

如果在操作过程中,oldValue被其他线程操作修改为2,那么当前线程的expectValue(1)与oldValue(2)比较就不等,写入失败。

Disruptor如何进行CAS

我们知道Disruptor核心数据结构为RingBuffer,Disruptor为RingBuffer分配了一个Sequence对象,用于标识RingBuffer的头和尾,这个标识不是通过指针实现的,而是通过序号。

这个序号也就是Sequence。

虽然逻辑上RingBuffer是一个环形数组,但是在内存中是以一个普通的数组形式存在的。

RingBuffer中通过对比序号的方式对生产者和消费者间的资源进行协调。

每当生产者要往队列中加入新数据,生产者都会将当前sequence + 准备加入队列的数据量,与消费者所在位置进行比较,以判断是否存在足够的空间放这些数据,而不至于覆盖掉消费者没有消费的数据。

用体育术语就叫“套圈”。

如图所示:ringBufferSize=16,生产者当前sequence指向20,消费者sequence指向27。

我们简单计算一下这个场景下,生产者能否继续写入4个元素。

  • 对于消费者而言,27 MOD 16 = 11
  • 对于生产者而言,20 + 4 = 24(预计写入的最大序号),24 MOD 16 = 8
  • 生产者若成功写入4个元素,则sequence指向数组的第8个位置,8 < 11, 表明生产者没有覆盖消费者的进度。
  • 生产者不需要等待消费者,直接生产数据即可。而且并不会覆盖消费者未处理完的数据。

实际上,Disruptor的代码实现就是通过compareAndSet方法实现了CAS无锁化操作。

    /**
     * Atomically add the supplied value.
     *
     * @param increment The value to add to the sequence.
     * @return The value after the increment.
     */
    public long addAndGet(final long increment)
    {
        long currentValue;
        long newValue;

        do
        {
            currentValue = get();
            newValue = currentValue + increment;
        }
        while (!compareAndSet(currentValue, newValue));

        return newValue;
    }

我们可以看到,这里通过while循环不断尝试CAS操作,如果CAS操作不成功就会自旋重试,这个操作并没有使用OS层面的锁,因此效率要大幅高于OS层面的锁机制(管程)。

addAndGet调用了compareAndSet方法:

    /**
     * Perform a compare and set operation on the sequence.
     *
     * @param expectedValue The expected current value.
     * @param newValue The value to update to.
     * @return true if the operation succeeds, false otherwise.
     */
    public boolean compareAndSet(final long expectedValue, final long newValue)
    {
        return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
    }

可以看到最终是调用了UNSAFE的compareAndSwapLong方法,该方法为native方法,在JVM层面调用了CAS指令。

CAS指令

上文我们提到,Disruptor的CAS最终调用的是CPU层面的机器指令「cmpxchg」

该指令的详细描述:

  compxchg [ax] (隐式参数,EAX 累加器), 
           [bx] (源操作数地址), 
           [cx] (目标操作数地址)

简单解释下:

  • cmpxchg指令有三个操作数,操作数ax不在指令里面出现,是一个隐式的操作数,准确地说它是EAX累加寄存器里面的值。
  • 操作数bx是源操作数,指令会对比这个操作数和上面的累加寄存器里面的值是否相等,如果相等 CPU 会把 ZF(也就是条件码寄存器里面零标志位的值)设置为 1,然后再把操作数cx(也就是目标操作数)设置到源操作数的地址上。
  • 如果不相等的话,就把源操作数里面的值设置到累加器寄存器里面

由于cmpxchg是cpu级别的指令,因此直接调用就可以保证cas操作的原子性。

由于去除了OS层面的锁,即便CAS存在比较操作与自旋操作,其本质也是无锁化操作,这种无锁化机制消除了上下文切换,对于CPU极为友好,因此运行效率很快。

事实上,在JUC包中,也提供了大量的CAS相关工作类方便我们操作,这些类一般以atomic开头,如果去研究其实现,我们同样会发现最终是通过UNSAFE调用了底层的CAS实现,实现无锁化操作,减少上下文切换,提升代码运行速率。

加锁导致的上下文切换之所以会显著影响代码运行速度,主要原因在于获取锁的过程中,CPU需要等待OS层面的锁竞争结果,对于没有获取锁的线程需要进行挂起,此时就需要进行上下文切换。

上下文切换会把挂起线程的寄存器中的数据放到线程栈,该操作会导致加载到高速缓存中的数据也失效,进而导致程序运行速率比无锁更慢。

CAS就没有什么问题么?

当然CAS操作同样也会存在缺点,那就是由于CAS操作本身需要进行对比,如果不相等则会一直自旋(busy-wait),这样的操作会使得cpu的负载升高,全功率满负荷运行。


浏览 43
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报