JUC10 利用AQS实现一个自己的Latch门门

XiaoYeGe

共 6016字,需浏览 13分钟

 ·

2023-10-02 10:26

11-8 DIY一次性门闩

利用 AQS实现一个自己的Latch门门

3b7d394c42f4d3533d1f3e673eb1f181.webp

AQS用法

第一步 :   写一个类,想好协作的逻辑,实现 获取 / 释放 方法。

第二步 : 内部写一个 Sync类继承AbstractQueuedSynchronizer

第三步 : 根据是否 独占 来重写 tryAcquire / tryRelease tryAcquireShared (int acquires)和 tryReleaseShared (intreleases)等方法,在之前写的获取/释放方法中调用AQS的acquire/release或者Shared方法

11-8-1 实现获取 /释放方法

      
        public void signal() {
      
      
            sync.releaseShared(0);
      
      
        }
      
      
        public void await() {
      
      
            sync.acquireShared(0);
      
      
        }
      
    

11-8-1-1 await()

l 起初门闩是关闭的 , 多个线程请求来了以后, 因为门闩是关闭的, 所以调用者调用await()方法, 则会进行阻塞, 进入等待状态

l 直到后续有一个线程调用了 signal()方法, 则相当于将门闩打开, 此前陷入等待的线程都会被释放

l 我们内部使用 AQS的state来表示这个门是否被打开

      
        0: 代表默认状态, 门关闭
      
      
        1: 门打开
      
    

      
        private class Sync extends AbstractQueuedSynchronizer {
      
      
            @Override
      
      
            protected int tryAcquireShared(int arg) {
      
      
                return (getState() == 1) ? 1 : -1;
      
      
            }
      
      
            @Override
      
      
            protected boolean tryReleaseShared(int arg) {
      
      
                setState(1);
      
      
                return true;
      
      
            }
      
      
        }
      
    

分析等待方法

      
        public void await() {
      
      
            sync.acquireShared(0);
      
      
        }
      
    

sync .acquireShared( 0 ); 方法是使用 AQS的一个基本规则, 如果使用AQS来实现一个线程协作器的话, 而且是一个共享锁就应该去调用 sync .acquireShared( 0 )

sync .acquireShared( 0 )

      
        public final void acquireShared(int arg) {
      
      
            if (tryAcquireShared(arg) < 0)// 去排队
      
      
                doAcquireShared(arg);
      
      
        }
      
    

其中的 tryAcquireShared 方法已经被我们重写

      
        
          @Override
        
      
      
        protected int tryAcquireShared(int arg) {
      
      
            return (getState() == 1) ? 1 : -1;
      
      
        }
      
    

==1, 门打开

<0, 门关闭

11-8-1-2 signal()

打开门闩的意味着就是将 stat e 的值修改为 1

      
        public void signal() {
      
      
            sync.releaseShared(0);
      
      
        }
      
    
      
        public final boolean releaseShared(int arg) {
      
      
            if (tryReleaseShared(arg)) {
      
      
                doReleaseShared();
      
      
                return true;
      
      
            }
      
      
            return false;
      
      
        }
      
    

同样会调用 releaseShared 方法 , releaseShared 是我们自己实现的

      
        
          @Override
        
      
      
        protected boolean tryReleaseShared(int arg) {
      
      
            setState(1);
      
      
            return true// 返回true代表将之前的线程都唤醒了
      
      
        }
      
    

11-8-2 OneShotLatch 

自己用 AQS实现一个简单的线程协作器。

OneShotLatch类的代码来自《Java并发编程实战》书

      
        public class OneShotLatch {
      
      
            private final Sync sync = new Sync();
      
      
            public static void main(String[] args) throws InterruptedException {
      
      
                OneShotLatch oneShotLatch = new OneShotLatch();
      
      
                for (int i = 0; i < 10; i++) {
      
      
                    new Thread(new Runnable() {
      
      
                        @Override
      
      
                        public void run() {
      
      
                            System.out.println(Thread.currentThread().getName() + "尝试获取latch,获取失败那就等待");
      
      
                            oneShotLatch.await();
      
      
                            System.out.println("开闸放行" + Thread.currentThread().getName() + "继续运行");
      
      
                        }
      
      
                    }).start();
      
      
                }
      
      
                Thread.sleep(5000);
      
      
                oneShotLatch.signal();
      
      
                new Thread(new Runnable() {
      
      
                    @Override
      
      
                    public void run() {
      
      
                        System.out.println(Thread.currentThread().getName() + "尝试获取latch,获取失败那就等待");
      
      
                        oneShotLatch.await();
      
      
           System.out.println("开闸放行"+ Thread.currentThread().getName()+"继续运行");
      
      
                    }
      
      
                }).start();
      
      
            }
      
      
            public void signal() {sync.releaseShared(0);}
      
      
            public void await() {sync.acquireShared(0);}
      
      
            private class Sync extends AbstractQueuedSynchronizer {
      
      
                @Override
      
      
                protected int tryAcquireShared(int arg) {
      
      
                    return (getState() == 1) ? 1 : -1;
      
      
                }
      
      
                @Override
      
      
                protected boolean tryReleaseShared(int arg) {
      
      
                    setState(1);
      
      
                    return true;
      
      
                }
      
      
            }
      
      
        }
      
    

运行结果 :

9eee0047f098e225d0bd0817b718f194.webp

近期热文

参考资料

1 | JUC


责编  | 小耶哥

本期作者  | 小耶哥

平台建设及技术支持  | 小耶哥


浏览 8
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报