浅谈限流算法

ProjectDaedalus

共 1709字,需浏览 4分钟

 · 2022-04-10

本文介绍几种常见的限流算法及其在Java下的实现方式

abstract.jpeg

计数器

固定窗口计数器

为了实现流量控制,一个朴素的想法是直接对时间窗口内的请求数量进行统计。如果未达到阈值,则放行请求;反之则对请求进行拒绝。当该时间窗口过去后,则直接将计数器清零。即所谓的固定窗口计数器。该算法的示意图如下所示,可以看到在时间窗口3中,由于计数器值为5已经达到阈值,故对于第6个请求进行限流

figure 1.jpeg

而在具体实现过程中,可通过定时线程在到达一个新时间窗口时将计数器清零。Java实现如下所示

/**
 * 限流算法: 固定时间窗口计数器
 */

public class FixedWindowCounterLimiter {

    /**
     *  时间窗口大小, Unit: s
     */

    private int windowSize;

    /**
     * 限流阈值
     */

    private int limit;

    /**
     * 计数器
     */

    private AtomicInteger count;

    public FixedWindowCounterLimiter(int windowSize, int limit) {
        this.windowSize = windowSize;
        this.limit = limit;
        count = new AtomicInteger(0);

        // 启动一个线程, 定时清除计数器值
        new Thread( () -> {
            while (true) {
                try{
                    Thread.sleep(windowSize * 1000);
                }catch (InterruptedException e) {
                    System.out.println("Happen Exception: "+e.getMessage());
                }
                count.set(0);
            }
        }).start();
    }

    public boolean tryAcquire() {
        int num =  count.incrementAndGet();
        // 以达到当前窗口的请求阈值
        if( num > limit ) {
            return false;
        } else {
            return true;
        }
    }
}

测试代码如下所示

/**
 * 测试: 固定时间窗口计数器
 */

@Test
public void test1() throws Exception {
    // 限流配置, 2s内只允许通过5个
    FixedWindowCounterLimiter rateLimiter = new FixedWindowCounterLimiter(25);

    // 请求总数
    int allNum = 3;
    // 通过数
    int passNum = 0;
    // 被限流数
    int blockNum = 0;
    //模拟连续请求
    for(int i=0; i        if(rateLimiter.tryAcquire()){
            passNum++;
        }else{
            blockNum++;
        }
    }
    System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);

    // 延时以准备下一次测试
    Thread.sleep(5000);

    allNum = 14;
    passNum = 0;
    blockNum = 0;
    //模拟连续请求
    for(int i=0; i        if(rateLimiter.tryAcquire()){
            passNum++;
        }else{
            blockNum++;
        }
    }
    System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);
}

测试结果如下,符合预期

figure 2.jpeg

滑动窗口计数器

在固定窗口计数器中存在一个明显的缺陷——临界问题。即分别对于时间窗口1、时间窗口2来说,其窗口内的请求数量均未超过阈值。但在下图红框部分的时间范围内,其放行的请求数量却超过了阈值要求

figure 3.jpeg

而这里的滑动窗口计数器即是对固定窗口计数器的改良。具体地,其在固定时间窗口的基础上,将其划分若干个小窗口,在各小窗口中对请求数分别进行统计。整个时间窗口随着时间的流逝,不断丢弃过期的小窗口,并将统计结果放在新的小窗口,这也是其被称为滑动窗口的由来。最后根据所有小窗口的统计值之和,来判断是放行请求还是进行限流。算法示意图如下所示,可以看到其在一个限流窗口时间范围内划分为3个子窗口。当子窗口数量越多,则子窗口的时间粒度也就越小,进而统计精度也就越高

figure 4.jpeg

在时间窗口刚刚开始滑动时,由于子窗口还未被完全填充。故为便于实现,推荐将对当前的统计计数值始终放在数组最后一个元素中,如上图的T1、T2时刻。具体实现如下所示

/**
 * 限流算法: 滑动窗口计数器
 */

public class SlidingWindowCounterLimiter {
    /**
     * 时间窗口大小, Unit: s
     */

    private int windowSize;

    /**
     * 用于统计的子窗口数量,默认为10
     */

    private int slotNum;

    /**
     * 子窗口的时间长度, Unit: ms
     */

    private int slotTime;

    /**
     * 限流阈值
     */

    private int limit;

    /**
     * 存放子窗口统计结果的数组
     * note: counters[0]记为数组左边, counters[size-1]记为数组右边
     */

    private int[] counters;

    private long lastTime;

    public SlidingWindowCounterLimiter(int windowSize, int limit) {
        this(windowSize, limit, 10);
    }

    public SlidingWindowCounterLimiter(int windowSize, int limit, int slotNum) {
        this.windowSize = windowSize;
        this.limit = limit;
        this.slotNum = slotNum;

        this.counters = new int[slotNum];
        // 计算子窗口的时间长度: 时间窗口 / 子窗口数量
        this.slotTime = windowSize * 1000 / slotNum;
        this.lastTime =  System.currentTimeMillis();
    }

    public synchronized boolean tryAcquire() {
        long currentTime = System.currentTimeMillis();
        // 计算滑动数, 子窗口统计时所对应的时间范围为左闭右开区间, 即[a,b)
        int slideNum = (int) Math.floor( (currentTime-lastTime)/slotTime );
        // 滑动窗口
        slideWindow(slideNum);
        // 统计滑动后的数组之和
        Integer sum = Arrays.stream(counters).sum();

        // 以达到当前时间窗口的请求阈值, 故被限流直接返回false
        if( sum > limit ) {
            return false;
        } else {    // 未达到限流, 故返回true
            counters[slotNum-1]++;
            return true;
        }
    }

    /**
     * 将数组元素全部向左移动num个位置
     * @param num
     */

    private void slideWindow(int num) {
        if( num == 0 ) {
            return;
        }

        // 数组中所有元素都会被移出, 故直接全部清零
        if( num >= slotNum ) {
            Arrays.fill(counters, 0);
        } else {
            // 对于a[0]~a[num-1]而言, 向左移动num个位置后, 则直接被移出了
            // 故从a[num]开始移动即可
            for(int index=num; index                // 计算a[index]元素向左移动num个位置后的新位置索引
                int newIndex = index - num;
                counters[newIndex] = counters[index];
            }
        }

        // 更新时间
        lastTime = lastTime + num * slotTime;
    }

}

测试代码如下所示

/**
 * 测试: 滑动时间窗口计数器
 */

@Test
public void test2() throws Exception {
    // 限流配置, 2s内只允许通过5个
    SlidingWindowCounterLimiter rateLimiter = new SlidingWindowCounterLimiter(25);

    // 请求总数
    int allNum = 3;
    // 通过数
    int passNum = 0;
    // 被限流数
    int blockNum = 0;
    //模拟连续请求
    for(int i=0; i        if(rateLimiter.tryAcquire()){
            passNum++;
        }else{
            blockNum++;
        }
    }
    System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);

    // 延时以准备下一次测试
    Thread.sleep(5000);

    allNum = 14;
    passNum = 0;
    blockNum = 0;
    //模拟连续请求
    for(int i=0; i        if(rateLimiter.tryAcquire()){
            passNum++;
        }else{
            blockNum++;
        }
    }
    System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);
}

测试结果如下,符合预期

figure 5.jpeg

Leaky Bucket 漏桶

As a Meter Version

所谓Leaky Bucket漏桶,指的是向桶中以任意速率注水,而由于该桶底部有一个漏洞,其会以固定的速率流出水。显然如果注水速率过快,桶中水量超过桶容量时即会导致水溢出。而将这一原理应用于限流领域时,不断涌来的请求即相当于向桶中注水,如果桶中水量未超过桶容量,则放行请求;反之则对请求限流。而桶固定的漏水速率则可以理解为系统处理请求流量的能力

figure 6.png

具体实现如下所示

/**
 * 漏桶算法: As a Meter Version
 */

public class LeakyBucketLimiter1 {

    /**
     * 桶容量, Unit: 个
     */

    private long capacity;

    /**
     * 出水速率, Unit: 个/秒
     */

    private long rate;

    /**
     * 桶的当前水量
     */

    private long water;

    /**
     * 上次时间
     */

    private long lastTime;

    public LeakyBucketLimiter1(long capacity, long rate) {
        this.capacity = capacity;
        this.rate = rate;
        this.water = 0;
        this.lastTime = System.currentTimeMillis();
    }

    public synchronized boolean tryAcquire() {
        // 获取当前时间
        long currentTime = System.currentTimeMillis();
        // 计算流出的水量: (当前时间-上次时间) * 出水速率
        long outWater = (currentTime-lastTime)/1000 * rate;
        // 计算水量: 桶的当前水量 - 流出的水量
        water = Math.max(0, water-outWater);
        // 更新时间
        lastTime = currentTime;

        // 当前水量 小于 桶容量, 则请求放行, 返回true
        if( water            water++;
            return true;
        }else{
            // 当前水量 不小于 桶容量, 则进行限流, 返回false
            return false;
        }
    }

}

测试代码如下所示

/**
 * 测试: 漏桶算法(As a Meter Version)
 */

@Test
public void test3() throws Exception {
    // 漏桶配置, 桶容量:5个, 出水率: 1个/秒
    LeakyBucketLimiter1 rateLimiter = new LeakyBucketLimiter1(51);

    // 请求总数
    int allNum = 3;
    // 通过数
    int passNum = 0;
    // 被限流数
    int blockNum = 0;
    // 模拟连续请求
    for(int i=0; i        if( rateLimiter.tryAcquire() ) {
            passNum++;
        }else{
            blockNum++;
        }
    }
    System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);

    // 延时以准备下一次测试
    Thread.sleep(8*1000);

    allNum = 22;
    passNum = 0;
    blockNum = 0;
    //模拟连续请求
    for(int i=0; i        if( rateLimiter.tryAcquire() ) {
            passNum++;
        }else{
            blockNum++;
        }
    }
    System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);
}

测试结果如下,符合预期

figure 7.jpeg

As a Queue Version

在As a Meter Version版本的漏桶中,当桶中水未满,请求即会直接被放行。而在漏桶的另外一个版本As a Queue Version中,如果桶中水未满,则该请求将会被暂时存储在桶中。然后以漏桶固定的出水速率对桶中存储的请求依次放行。对比两个版本的漏桶算法不难看出,As a Meter Version版本的漏桶算法可以应对、处理突发流量,只要桶中尚有足够空余即可立即放行请求;而对于As a Queue Version版本的漏桶,其只会以固定速率放行请求,无法充分利用后续系统的处理能力

figure 8.png

具体实现如下所示

/**
 * 漏桶算法:  As a Queue Version
 */

public class LeakyBucketLimiter2 {

    /**
     * 定时任务线程池, 用于以指定速率rate从阻塞队列中获取用户请求进行放行、处理
     */

    private ScheduledExecutorService threadPool;

    /**
     * 阻塞队列, 用于存储用户请求
     */

    private ArrayBlockingQueue queue;

    /**
     * 桶容量, Unit: 个
     */

    private int capacity;

    /**
     * 出水速率, Unit: 个/秒
     */

    private long rate;

    public LeakyBucketLimiter2(int capacity, long rate) {
        this.capacity = capacity;
        this.rate = rate;

        // 根据桶容量构建有界队列
        queue = new ArrayBlockingQueue<>( capacity );

        threadPool = Executors.newSingleThreadScheduledExecutor();
        // 根据出水速率rate计算从阻塞队列获取用户请求的周期, Unit: ms
        long period = 1000 / rate;
        threadPool.scheduleAtFixedRate( getTask() ,0, period, TimeUnit.MILLISECONDS);
    }

    public boolean tryAcquire(UserRequest userRequest) {
        // 添加失败表示用户请求被限流, 则返回false
        return queue.offer(userRequest);
    }

    private Runnable getTask() {
        return () -> {
            // 从阻塞队列获取用户请求
            UserRequest userRequest = queue.poll();
            if( userRequest!=null ) {
                userRequest.handle();
            }
        };
    }

    /**
     * 用户请求
     */

    @AllArgsConstructor
    public static class UserRequest {

        private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

        private String name;

        public void handle() {
            String timeStr = formatter.format( LocalTime.now() );
            String msg = "<"+timeStr+"> "+name+" 开始处理";
            System.out.println(msg);
        }
    }

}

测试代码如下所示

/**
 * 测试: 漏桶算法(As a Queue Version)
 */

@Test
public void test4() throws Exception {
    // 漏桶配置, 桶容量:5个, 出水率: 2个/秒
    LeakyBucketLimiter2 rateLimiter = new LeakyBucketLimiter2(52);

    // 请求总数
    int allNum = 7;
    // 通过数
    int passNum = 0;
    // 被限流数
    int blockNum = 0;
    // 模拟连续请求
    for(int i=1; i<=allNum; i++) {
        // 构建用户请求
        String name = "用户请求:" + i;
        LeakyBucketLimiter2.UserRequest userRequest = new LeakyBucketLimiter2.UserRequest(name);

        if( rateLimiter.tryAcquire( userRequest ) ) {
            passNum++;
        }else{
            blockNum++;
        }
    }
    System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);

    // 延时等待
    Thread.sleep(120*1000);
}

测试结果如下,符合预期

figure 9.jpeg

Token Bucket令牌桶

Token Bucket令牌桶的基本原理其实并不复杂,我们以固定速率发放令牌到令牌桶,直到达到桶的容量为止。请求每次会先到令牌桶中获取令牌,如果桶中尚有令牌、获取成功则放行请求;反之,则对请求进行限流。事实上,可以看到Token Bucket令牌桶与As a Meter Version版本的漏桶,其实是一体两面的。前者负责消耗令牌,后者负责注水。本质上是相同的,只不过是思维角度不一样。具体实现如下所示

/**
 * 令牌桶算法
 */

public class TokenBucketLimiter {

    /**
     * 桶容量, Unit: 个
     */

    private long capacity;

    /**
     * 令牌生成速率, Unit: 个/秒
     */

    private long rate;

    /**
     * 桶当前的令牌数量
     */

    private long tokens;

    /**
     * 上次时间
     */

    private long lastTime;

    public TokenBucketLimiter(long capacity, long rate) {
        this.capacity = capacity;
        this.rate = rate;
        this.tokens = capacity;
        this.lastTime = System.currentTimeMillis();
    }

    public synchronized boolean tryAcquire() {
        // 获取当前时间
        long currentTime = System.currentTimeMillis();
        // 计算生成的令牌数量: (当前时间-上次时间) * 令牌生成速率
        long newTokenNum = (currentTime-lastTime)/1000 * rate;
        // 计算令牌数量: 桶当前的令牌数量 + 生成的令牌数量
        tokens = Math.min(capacity, tokens+newTokenNum);
        // 更新时间
        lastTime = currentTime;

        // 桶中仍有令牌, 则请求放行, 返回true
        if( tokens > 0 ) {
            tokens--;
            return true;
        }else{
            // 桶中没有令牌, 则进行限流, 返回false
            return false;
        }
    }

}

测试代码如下所示

/**
 * 测试: 令牌桶算法
 */

@Test
public void test5() throws Exception {
    // 令牌桶配置, 桶容量:5个, 令牌生成速率: 1个/秒
    TokenBucketLimiter rateLimiter = new TokenBucketLimiter(51);

    // 请求总数
    int allNum = 3;
    // 通过数
    int passNum = 0;
    // 被限流数
    int blockNum = 0;
    // 模拟连续请求
    for(int i=0; i        if( rateLimiter.tryAcquire() ) {
            passNum++;
        }else{
            blockNum++;
        }
    }
    System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);

    // 延时以准备下一次测试
    Thread.sleep(8*1000);

    allNum = 22;
    passNum = 0;
    blockNum = 0;
    //模拟连续请求
    for(int i=0; i        if( rateLimiter.tryAcquire() ) {
            passNum++;
        }else{
            blockNum++;
        }
    }
    System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);
}

测试结果如下,符合预期

figure 10.jpeg

参考文献

  1. 凤凰架构 周志明著
浏览 29
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报