浅谈限流算法
本文介绍几种常见的限流算法及其在Java下的实现方式
计数器
固定窗口计数器
为了实现流量控制,一个朴素的想法是直接对时间窗口内的请求数量进行统计。如果未达到阈值,则放行请求;反之则对请求进行拒绝。当该时间窗口过去后,则直接将计数器清零。即所谓的固定窗口计数器。该算法的示意图如下所示,可以看到在时间窗口3中,由于计数器值为5已经达到阈值,故对于第6个请求进行限流
而在具体实现过程中,可通过定时线程在到达一个新时间窗口时将计数器清零。Java实现如下所示
/**
* 限流算法: 固定时间窗口计数器
*/
public class FixedWindowCounterLimiter {
/**
* 时间窗口大小, Unit: s
*/
private int windowSize;
/**
* 限流阈值
*/
private int limit;
/**
* 计数器
*/
private AtomicInteger count;
public FixedWindowCounterLimiter(int windowSize, int limit) {
this.windowSize = windowSize;
this.limit = limit;
count = new AtomicInteger(0);
// 启动一个线程, 定时清除计数器值
new Thread( () -> {
while (true) {
try{
Thread.sleep(windowSize * 1000);
}catch (InterruptedException e) {
System.out.println("Happen Exception: "+e.getMessage());
}
count.set(0);
}
}).start();
}
public boolean tryAcquire() {
int num = count.incrementAndGet();
// 以达到当前窗口的请求阈值
if( num > limit ) {
return false;
} else {
return true;
}
}
}
测试代码如下所示
/**
* 测试: 固定时间窗口计数器
*/
@Test
public void test1() throws Exception {
// 限流配置, 2s内只允许通过5个
FixedWindowCounterLimiter rateLimiter = new FixedWindowCounterLimiter(2, 5);
// 请求总数
int allNum = 3;
// 通过数
int passNum = 0;
// 被限流数
int blockNum = 0;
//模拟连续请求
for(int i=0; i if(rateLimiter.tryAcquire()){
passNum++;
}else{
blockNum++;
}
}
System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);
// 延时以准备下一次测试
Thread.sleep(5000);
allNum = 14;
passNum = 0;
blockNum = 0;
//模拟连续请求
for(int i=0; i if(rateLimiter.tryAcquire()){
passNum++;
}else{
blockNum++;
}
}
System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);
}
测试结果如下,符合预期
滑动窗口计数器
在固定窗口计数器中存在一个明显的缺陷——临界问题。即分别对于时间窗口1、时间窗口2来说,其窗口内的请求数量均未超过阈值。但在下图红框部分的时间范围内,其放行的请求数量却超过了阈值要求
而这里的滑动窗口计数器即是对固定窗口计数器的改良。具体地,其在固定时间窗口的基础上,将其划分若干个小窗口,在各小窗口中对请求数分别进行统计。整个时间窗口随着时间的流逝,不断丢弃过期的小窗口,并将统计结果放在新的小窗口,这也是其被称为滑动窗口的由来。最后根据所有小窗口的统计值之和,来判断是放行请求还是进行限流。算法示意图如下所示,可以看到其在一个限流窗口时间范围内划分为3个子窗口。当子窗口数量越多,则子窗口的时间粒度也就越小,进而统计精度也就越高
在时间窗口刚刚开始滑动时,由于子窗口还未被完全填充。故为便于实现,推荐将对当前的统计计数值始终放在数组最后一个元素中,如上图的T1、T2时刻。具体实现如下所示
/**
* 限流算法: 滑动窗口计数器
*/
public class SlidingWindowCounterLimiter {
/**
* 时间窗口大小, Unit: s
*/
private int windowSize;
/**
* 用于统计的子窗口数量,默认为10
*/
private int slotNum;
/**
* 子窗口的时间长度, Unit: ms
*/
private int slotTime;
/**
* 限流阈值
*/
private int limit;
/**
* 存放子窗口统计结果的数组
* note: counters[0]记为数组左边, counters[size-1]记为数组右边
*/
private int[] counters;
private long lastTime;
public SlidingWindowCounterLimiter(int windowSize, int limit) {
this(windowSize, limit, 10);
}
public SlidingWindowCounterLimiter(int windowSize, int limit, int slotNum) {
this.windowSize = windowSize;
this.limit = limit;
this.slotNum = slotNum;
this.counters = new int[slotNum];
// 计算子窗口的时间长度: 时间窗口 / 子窗口数量
this.slotTime = windowSize * 1000 / slotNum;
this.lastTime = System.currentTimeMillis();
}
public synchronized boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
// 计算滑动数, 子窗口统计时所对应的时间范围为左闭右开区间, 即[a,b)
int slideNum = (int) Math.floor( (currentTime-lastTime)/slotTime );
// 滑动窗口
slideWindow(slideNum);
// 统计滑动后的数组之和
Integer sum = Arrays.stream(counters).sum();
// 以达到当前时间窗口的请求阈值, 故被限流直接返回false
if( sum > limit ) {
return false;
} else { // 未达到限流, 故返回true
counters[slotNum-1]++;
return true;
}
}
/**
* 将数组元素全部向左移动num个位置
* @param num
*/
private void slideWindow(int num) {
if( num == 0 ) {
return;
}
// 数组中所有元素都会被移出, 故直接全部清零
if( num >= slotNum ) {
Arrays.fill(counters, 0);
} else {
// 对于a[0]~a[num-1]而言, 向左移动num个位置后, 则直接被移出了
// 故从a[num]开始移动即可
for(int index=num; index // 计算a[index]元素向左移动num个位置后的新位置索引
int newIndex = index - num;
counters[newIndex] = counters[index];
}
}
// 更新时间
lastTime = lastTime + num * slotTime;
}
}
测试代码如下所示
/**
* 测试: 滑动时间窗口计数器
*/
@Test
public void test2() throws Exception {
// 限流配置, 2s内只允许通过5个
SlidingWindowCounterLimiter rateLimiter = new SlidingWindowCounterLimiter(2, 5);
// 请求总数
int allNum = 3;
// 通过数
int passNum = 0;
// 被限流数
int blockNum = 0;
//模拟连续请求
for(int i=0; i if(rateLimiter.tryAcquire()){
passNum++;
}else{
blockNum++;
}
}
System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);
// 延时以准备下一次测试
Thread.sleep(5000);
allNum = 14;
passNum = 0;
blockNum = 0;
//模拟连续请求
for(int i=0; i if(rateLimiter.tryAcquire()){
passNum++;
}else{
blockNum++;
}
}
System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);
}
测试结果如下,符合预期
Leaky Bucket 漏桶
As a Meter Version
所谓Leaky Bucket漏桶,指的是向桶中以任意速率注水,而由于该桶底部有一个漏洞,其会以固定的速率流出水。显然如果注水速率过快,桶中水量超过桶容量时即会导致水溢出。而将这一原理应用于限流领域时,不断涌来的请求即相当于向桶中注水,如果桶中水量未超过桶容量,则放行请求;反之则对请求限流。而桶固定的漏水速率则可以理解为系统处理请求流量的能力
具体实现如下所示
/**
* 漏桶算法: As a Meter Version
*/
public class LeakyBucketLimiter1 {
/**
* 桶容量, Unit: 个
*/
private long capacity;
/**
* 出水速率, Unit: 个/秒
*/
private long rate;
/**
* 桶的当前水量
*/
private long water;
/**
* 上次时间
*/
private long lastTime;
public LeakyBucketLimiter1(long capacity, long rate) {
this.capacity = capacity;
this.rate = rate;
this.water = 0;
this.lastTime = System.currentTimeMillis();
}
public synchronized boolean tryAcquire() {
// 获取当前时间
long currentTime = System.currentTimeMillis();
// 计算流出的水量: (当前时间-上次时间) * 出水速率
long outWater = (currentTime-lastTime)/1000 * rate;
// 计算水量: 桶的当前水量 - 流出的水量
water = Math.max(0, water-outWater);
// 更新时间
lastTime = currentTime;
// 当前水量 小于 桶容量, 则请求放行, 返回true
if( water water++;
return true;
}else{
// 当前水量 不小于 桶容量, 则进行限流, 返回false
return false;
}
}
}
测试代码如下所示
/**
* 测试: 漏桶算法(As a Meter Version)
*/
@Test
public void test3() throws Exception {
// 漏桶配置, 桶容量:5个, 出水率: 1个/秒
LeakyBucketLimiter1 rateLimiter = new LeakyBucketLimiter1(5, 1);
// 请求总数
int allNum = 3;
// 通过数
int passNum = 0;
// 被限流数
int blockNum = 0;
// 模拟连续请求
for(int i=0; i if( rateLimiter.tryAcquire() ) {
passNum++;
}else{
blockNum++;
}
}
System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);
// 延时以准备下一次测试
Thread.sleep(8*1000);
allNum = 22;
passNum = 0;
blockNum = 0;
//模拟连续请求
for(int i=0; i if( rateLimiter.tryAcquire() ) {
passNum++;
}else{
blockNum++;
}
}
System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);
}
测试结果如下,符合预期
As a Queue Version
在As a Meter Version版本的漏桶中,当桶中水未满,请求即会直接被放行。而在漏桶的另外一个版本As a Queue Version中,如果桶中水未满,则该请求将会被暂时存储在桶中。然后以漏桶固定的出水速率对桶中存储的请求依次放行。对比两个版本的漏桶算法不难看出,As a Meter Version版本的漏桶算法可以应对、处理突发流量,只要桶中尚有足够空余即可立即放行请求;而对于As a Queue Version版本的漏桶,其只会以固定速率放行请求,无法充分利用后续系统的处理能力
具体实现如下所示
/**
* 漏桶算法: As a Queue Version
*/
public class LeakyBucketLimiter2 {
/**
* 定时任务线程池, 用于以指定速率rate从阻塞队列中获取用户请求进行放行、处理
*/
private ScheduledExecutorService threadPool;
/**
* 阻塞队列, 用于存储用户请求
*/
private ArrayBlockingQueue queue;
/**
* 桶容量, Unit: 个
*/
private int capacity;
/**
* 出水速率, Unit: 个/秒
*/
private long rate;
public LeakyBucketLimiter2(int capacity, long rate) {
this.capacity = capacity;
this.rate = rate;
// 根据桶容量构建有界队列
queue = new ArrayBlockingQueue<>( capacity );
threadPool = Executors.newSingleThreadScheduledExecutor();
// 根据出水速率rate计算从阻塞队列获取用户请求的周期, Unit: ms
long period = 1000 / rate;
threadPool.scheduleAtFixedRate( getTask() ,0, period, TimeUnit.MILLISECONDS);
}
public boolean tryAcquire(UserRequest userRequest) {
// 添加失败表示用户请求被限流, 则返回false
return queue.offer(userRequest);
}
private Runnable getTask() {
return () -> {
// 从阻塞队列获取用户请求
UserRequest userRequest = queue.poll();
if( userRequest!=null ) {
userRequest.handle();
}
};
}
/**
* 用户请求
*/
@AllArgsConstructor
public static class UserRequest {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private String name;
public void handle() {
String timeStr = formatter.format( LocalTime.now() );
String msg = "<"+timeStr+"> "+name+" 开始处理";
System.out.println(msg);
}
}
}
测试代码如下所示
/**
* 测试: 漏桶算法(As a Queue Version)
*/
@Test
public void test4() throws Exception {
// 漏桶配置, 桶容量:5个, 出水率: 2个/秒
LeakyBucketLimiter2 rateLimiter = new LeakyBucketLimiter2(5, 2);
// 请求总数
int allNum = 7;
// 通过数
int passNum = 0;
// 被限流数
int blockNum = 0;
// 模拟连续请求
for(int i=1; i<=allNum; i++) {
// 构建用户请求
String name = "用户请求:" + i;
LeakyBucketLimiter2.UserRequest userRequest = new LeakyBucketLimiter2.UserRequest(name);
if( rateLimiter.tryAcquire( userRequest ) ) {
passNum++;
}else{
blockNum++;
}
}
System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);
// 延时等待
Thread.sleep(120*1000);
}
测试结果如下,符合预期
Token Bucket令牌桶
Token Bucket令牌桶的基本原理其实并不复杂,我们以固定速率发放令牌到令牌桶,直到达到桶的容量为止。请求每次会先到令牌桶中获取令牌,如果桶中尚有令牌、获取成功则放行请求;反之,则对请求进行限流。事实上,可以看到Token Bucket令牌桶与As a Meter Version版本的漏桶,其实是一体两面的。前者负责消耗令牌,后者负责注水。本质上是相同的,只不过是思维角度不一样。具体实现如下所示
/**
* 令牌桶算法
*/
public class TokenBucketLimiter {
/**
* 桶容量, Unit: 个
*/
private long capacity;
/**
* 令牌生成速率, Unit: 个/秒
*/
private long rate;
/**
* 桶当前的令牌数量
*/
private long tokens;
/**
* 上次时间
*/
private long lastTime;
public TokenBucketLimiter(long capacity, long rate) {
this.capacity = capacity;
this.rate = rate;
this.tokens = capacity;
this.lastTime = System.currentTimeMillis();
}
public synchronized boolean tryAcquire() {
// 获取当前时间
long currentTime = System.currentTimeMillis();
// 计算生成的令牌数量: (当前时间-上次时间) * 令牌生成速率
long newTokenNum = (currentTime-lastTime)/1000 * rate;
// 计算令牌数量: 桶当前的令牌数量 + 生成的令牌数量
tokens = Math.min(capacity, tokens+newTokenNum);
// 更新时间
lastTime = currentTime;
// 桶中仍有令牌, 则请求放行, 返回true
if( tokens > 0 ) {
tokens--;
return true;
}else{
// 桶中没有令牌, 则进行限流, 返回false
return false;
}
}
}
测试代码如下所示
/**
* 测试: 令牌桶算法
*/
@Test
public void test5() throws Exception {
// 令牌桶配置, 桶容量:5个, 令牌生成速率: 1个/秒
TokenBucketLimiter rateLimiter = new TokenBucketLimiter(5, 1);
// 请求总数
int allNum = 3;
// 通过数
int passNum = 0;
// 被限流数
int blockNum = 0;
// 模拟连续请求
for(int i=0; i if( rateLimiter.tryAcquire() ) {
passNum++;
}else{
blockNum++;
}
}
System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);
// 延时以准备下一次测试
Thread.sleep(8*1000);
allNum = 22;
passNum = 0;
blockNum = 0;
//模拟连续请求
for(int i=0; i if( rateLimiter.tryAcquire() ) {
passNum++;
}else{
blockNum++;
}
}
System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);
}
测试结果如下,符合预期
参考文献
凤凰架构 周志明著