JUC10 利用AQS实现一个自己的Latch门门

共 6016字,需浏览 13分钟

 ·

2023-10-02 10:26


11-8 DIY一次性门闩



利用 AQS实现一个自己的Latch门门



3b7d394c42f4d3533d1f3e673eb1f181.webp


AQS用法


第一步 :   写一个类,想好协作的逻辑,实现 获取 / 释放 方法。


第二步 : 内部写一个 Sync类继承AbstractQueuedSynchronizer


第三步 : 根据是否 独占 来重写 tryAcquire / tryRelease tryAcquireShared (int acquires)和 tryReleaseShared (intreleases)等方法,在之前写的获取/释放方法中调用AQS的acquire/release或者Shared方法


11-8-1 实现获取 /释放方法



      
public void signal() {


sync.releaseShared(0);


}


public void await() {


sync.acquireShared(0);


}


11-8-1-1 await()



l 起初门闩是关闭的 , 多个线程请求来了以后, 因为门闩是关闭的, 所以调用者调用await()方法, 则会进行阻塞, 进入等待状态


l 直到后续有一个线程调用了 signal()方法, 则相当于将门闩打开, 此前陷入等待的线程都会被释放


l 我们内部使用 AQS的state来表示这个门是否被打开


      
0: 代表默认状态, 门关闭


1: 门打开



      
private class Sync extends AbstractQueuedSynchronizer {


@Override


protected int tryAcquireShared(int arg) {


return (getState() == 1) ? 1 : -1;


}


@Override


protected boolean tryReleaseShared(int arg) {


setState(1);


return true;


}


}


分析等待方法



      
public void await() {


sync.acquireShared(0);


}


sync .acquireShared( 0 ); 方法是使用 AQS的一个基本规则, 如果使用AQS来实现一个线程协作器的话, 而且是一个共享锁就应该去调用 sync .acquireShared( 0 )


sync .acquireShared( 0 )



      
public final void acquireShared(int arg) {


if (tryAcquireShared(arg) < 0)// 去排队


doAcquireShared(arg);


}


其中的 tryAcquireShared 方法已经被我们重写


      

@Override



protected int tryAcquireShared(int arg) {


return (getState() == 1) ? 1 : -1;


}


==1, 门打开


<0, 门关闭


11-8-1-2 signal()



打开门闩的意味着就是将 stat e 的值修改为 1


      
public void signal() {


sync.releaseShared(0);


}


      
public final boolean releaseShared(int arg) {


if (tryReleaseShared(arg)) {


doReleaseShared();


return true;


}


return false;


}


同样会调用 releaseShared 方法 , releaseShared 是我们自己实现的


      

@Override



protected boolean tryReleaseShared(int arg) {


setState(1);


    return true// 返回true代表将之前的线程都唤醒了


}


11-8-2 OneShotLatch 



自己用 AQS实现一个简单的线程协作器。



OneShotLatch类的代码来自《Java并发编程实战》书


      
public class OneShotLatch {


private final Sync sync = new Sync();


public static void main(String[] args) throws InterruptedException {


OneShotLatch oneShotLatch = new OneShotLatch();


for (int i = 0; i < 10; i++) {


new Thread(new Runnable() {


@Override


public void run() {


System.out.println(Thread.currentThread().getName() + "尝试获取latch,获取失败那就等待");


oneShotLatch.await();


System.out.println("开闸放行" + Thread.currentThread().getName() + "继续运行");


}


}).start();


}


Thread.sleep(5000);


oneShotLatch.signal();


new Thread(new Runnable() {


@Override


public void run() {


System.out.println(Thread.currentThread().getName() + "尝试获取latch,获取失败那就等待");


oneShotLatch.await();


System.out.println("开闸放行"+ Thread.currentThread().getName()+"继续运行");


}


}).start();


}


public void signal() {sync.releaseShared(0);}


public void await() {sync.acquireShared(0);}


private class Sync extends AbstractQueuedSynchronizer {


@Override


protected int tryAcquireShared(int arg) {


return (getState() == 1) ? 1 : -1;


}


@Override


protected boolean tryReleaseShared(int arg) {


setState(1);


return true;


}


}


}


运行结果 :


9eee0047f098e225d0bd0817b718f194.webp


近期热文



参考资料


1 | JUC




责编  | 小耶哥


本期作者  | 小耶哥


平台建设及技术支持  | 小耶哥




浏览 98
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报