Semaphore自白:限流器用我就对了!

互联网全栈架构

共 5108字,需浏览 11分钟

 ·

2021-04-25 10:31

作者 | 王磊

来源 | Java中文社群(ID:javacn666)

转载请联系授权(微信ID:GG_Stone)

大家好,我是 Semaphore,我的中文名字叫“信号量”,我来自 JUC 家族(java.util.concurrent)

我们家族有很多优秀的成员,比如:CountDownLatch:等待其他线程都执行完再执行某线程CyclicBarrier:循环阻塞一组线程,直到某个事件达成,当然我也不比他们弱哦 罒ω罒。

以下是我的个人简历,希望各位读者老爷们给个好评和三连,先在此谢过了~

基本信息

  • 姓名:Semaphore
  • 中文名:(计数)信号量
  • 出生日期:JDK 1.5
  • 籍贯:JUC(java.util.concurrent)
  • 用途:Java 中的一个同步器,与 CountDownLatch 和 CyclicBarrier 不同,Semaphore 是用来管理许可证的,线程在调用 acquire() 方法时,如果没有许可证,那么线程就会阻塞等待,直到有许可证时才能继续执行。许可证使用 release() 方法来发布(发布一个许可证),调用 acquire() 方法时,如果有证书会减少许可证并继续执行后面的代码,如果没有证书只能阻塞等待许可证,而 Semaphore 在创建时会声明许可证的最大数量。

专业技能

我的专业技能就是“管理证书”,使用此技能可以轻松的实现「限流」功能

什么是限流?

比如五一小长假快到了,到那时会有大量的人去各个景区游玩,但是每个景区能容纳的人是有限的,比如大西安的大唐芙蓉园,它的日承载量是 6 万人次,也就是说每天最多能让 6 万来这里游玩,但五一的时候会来很多的人,比如突然来了 10 万人,那这个时候就只能「限流」排队等待入园了。

也就说,大唐芙蓉园会让 6 万人先进去玩,剩余的人在门口等待排队,当有人从里面出来的时候,才允许另一个排队的人进去。工作人员会把人数始终控制在 6 万人以下,这样做的目的是为了让游玩的人有一个好的体验,不至于造成一些意外事故,比如踩踏事件什么的,一定程度上保证了社会的稳定,也便于景区良好的口碑建立和日后的正常运营,而这种排队限制最大人数的行为就是「限流」

再来举个例子,比如以车辆的限号来说,它也是限流的一种常见场景。这样做的好处,一方面是可以保护环境尽可能少一些碳排放,另一方面能有效的缓解上、下班高峰时段的拥堵情况。尤其是在大西安,很难想象如果不限号,那么会堵成什么样?(PS:让原本本不富裕的生活更是雪上加霜...)

咱们再从生活中的事例回到程序当中,假设一个程序只能为 10W 人提供服务,突然有一天因为某个热点事件,造成了系统短时间内的访问量迅速增加到了 50W,那么导致的直接结果是系统崩溃,任何人都不能用系统了,显然只有少人数能用远比所有人都不能用更符合我们的预期,因此这个时候我们要使用「限流」了。

使用Semaphore实现限流

Semaphore 在创建的时候可以设置证书的数量,相当于设置了限流的最大值,再通过 release() 方法来发放证书,通过 acquire() 方法来阻塞并等待证书,这样就通过控制证书的方式来实现限流功能了。

项目经验

接下来,咱们使用代码的方式来演示 Semaphore 的使用。我们以停车场的限流为例,假设整个停车场只有 2 个车位(车位虽少,但足矣说明问题),但来停车的却有 5 辆车,显然车位不够用了,此时需要保证停车场最多只能有 2 辆车,接下来咱们使用 Semaphore 来实现车辆的限流功能,具体实现代码如下:

import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
* Author:磊哥
* By:Java中文社群
*/

public class SemaphoreExample {
// 创建信号量
static Semaphore semaphore = new Semaphore(2);

public static void main(String[] args) {

// 创建 5 个固定的线程数
ExecutorService threadPool = Executors.newFixedThreadPool(5);

// 定义执行任务
Runnable runnable = new Runnable() {
@Override
public void run() {
// 拿到当前线程的名称
String tname = Thread.currentThread().getName();
System.out.println(String.format("老司机:%s,停车场外排队,时间:%s",
tname, new Date()));
try {
// 执行此行,让所有线程先排队等待进入停车场
Thread.sleep(100);
// 执行阻塞
semaphore.acquire();
System.out.println(String.format("老司机:%s,已进入停车场,时间:%s",
tname, new Date()));
Thread.sleep(1000);
System.out.println(String.format("老司机:%s,离开停车场,时间:%s",
tname, new Date()));
// 释放锁
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};

// 执行任务 1
threadPool.submit(runnable);

// 执行任务 2
threadPool.submit(runnable);

// 执行任务 3
threadPool.submit(runnable);

// 执行任务 4
threadPool.submit(runnable);

// 执行任务 5
threadPool.submit(runnable);

// 等线程池任务执行完之后关闭
threadPool.shutdown();
}
}

以上代码的执行结果如下:

从上述的结果我们可以看出,当有 5 辆车同时需要进入停车场时,因为停车场的停车位只有 2 个,所以停车场最多只能容纳 2 辆车。此时我们通过 Semaphore 的 acquire 方法(阻塞等待)和 release 方法(颁发一个证书)顺利的实现了限流的功能,让停车场的车辆数始终控制在 2 辆车以下(等于或小于 2 辆车)。

个人评价

我(Semaphore)实现证书控制手段有两种,一种公平模式和非公平模式,当然为了执行的性能考虑,默认情况下我采取的是非公平的方式,具体实现可见源码:

public Semaphore(int permits) {
sync = new NonfairSync(permits); // 非公平模式
}

关于公平模式和非公平模式

所谓的公平模式就是以调用 acquire() 的先后顺序来决定获取许可证的顺序的,公平模式遵循先进先出(FIFO)原则;而非公平模式是抢占式的,也就是有可能一个新的获取线程恰好在一个许可证释放时得到了这个许可证,而前面还有等待的线程。

显然使用非公平的模式性能更高,因为它会把许可证发放给刚好准备好的线程,而不用再根据先后顺序去“叫号”了。

使用公平模式

当然,你可以手动选择使用公平模式来运行 Semaphore,Semaphore 提供了两个构造函数,源码如下:

public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

如果想用公平模式就可以使用第二个构造函数 Semaphore(int permits, boolean fair),将 fair 值设置为 true 就是公平模式来获取证书了。

其他补充

我还提供了一些其他方法,用于实现更多的功能,详情如下:

  • int availablePermits():返回此信号量中当前可用的许可证数。
  • int getQueueLength():返回正在等待获取许可证的线程数。
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
  • boolean isFair():查询 Semaphore 使用的是公平模式还是非公平模式,如果此信号量使用的是公平模式则返回 true。
  • void release(int permits):释放给定数量的许可证,将其返回到信号量。
  • tryAcquire():从这个信号量获得许可证,只有在调用时可以使用该许可证。
  • tryAcquire(int permits):从这个信号量获取给定数量的许可证,只有在调用时全部可用。
  • tryAcquire(int permits, long timeout, TimeUnit unit):从该信号量获取给定数量的许可证,如果在给定的等待时间内全部可用,并且当前线程尚未 interrupted。
  • tryAcquire(long timeout, TimeUnit unit):如果在给定的等待时间内可用,并且当前线程尚未 到达 interrupted,则从该信号量获取许可。
  • void reducePermits(int reduction) :减少可用的许可证数量 reduction 个,它是 protected 方法。
  • Collection getQueuedThreads() :返回所有等待获取许可证的线程集合,它是 protected 方法。

总结

Semaphore 信号量是用来管理一组证书的,默认情况下它采取的是非公平的方式来管理证书,这样做的目的是为了实现高性能。Semaphore 中包含了两个重要的方法:release() 方法发布一个许可证书;acquire() 方法阻塞并等待一个证书。当线程调用了 acquire() 方法只有拥有了证书才能继续执行,因此可以使用 Semaphore 来实现限流。

推荐阅读:

Kafka原理篇:图解kakfa架构原理

架构设计方法论

从面试角度一文学完 Kafka

数据库跟缓存的双写一致性

全网最详尽的负载均衡原理图解


互联网全栈架构

浏览 15
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报