Sentinel源码剖析之常用限流算法原理实现

1、限流算法简介

限流顾名思义,就是对请求或并发数进行限制;通过对一个时间窗口内的请求量进行限制来保障系统的正常运行。如果我们的服务资源有限、处理能力有限,就需要对调用我们服务的上游请求进行限制,以防止自身服务由于资源耗尽而停止服务。

在限流中有两个概念需要了解。

  • 阈值:在一个单位时间内允许的请求量。如 QPS 限制为10,说明 1 秒内最多接受 10 次请求。
  • 拒绝策略:超过阈值的请求的拒绝策略,常见的拒绝策略有直接拒绝、排队等待等。

(1)固定窗口/滑动窗口:

固定窗口在一段时间间隔内(时间窗/时间区间),处理请求的最大数量固定,超过部分不做处理。滑动窗口解决了固定窗口临界突破的问题,只要窗口足够细分。

(2)漏桶:

漏桶大小固定,处理速度固定,但请求进入速度不固定(在突发情况请求过多时,会丢弃过多的请求)。

(3)令牌桶:

令牌桶的大小固定,令牌的产生速度固定,但是消耗令牌(即请求)速度不固定(可以应对一些某些时间请求过多的情况);每个请求都会从令牌桶中取出令牌,如果没有令牌则丢弃该次请求。

(4)分布式流控

2、固定窗口限流

在一段时间间隔内(时间窗/时间区间),处理请求的最大数量固定,超过部分不做处理。

举个例子,比如我们规定对于接口,我们1s的访问次数不能超过2个。

那么我们可以这么做:

  • 在一开 始的时候,我们可以设置一个计数器counter,每当一个请求过来的时候,counter就加1,如果counter的值大于2并且该请求与第一个请求的间隔时间还在指定的时间窗口之内,那么说明请求数过多,拒绝访问;

  • 如果该请求与第一个请求的间隔时间大于指定的时间窗口,且counter的值还在限流范围内,那么就重置 counter,就是这么简单粗暴。

实现

// 计速器 限速
@Slf4j
public class CounterLimiter
{// 起始时间private static long startTime = System.currentTimeMillis();// 时间区间的时间间隔 msprivate static long interval = 1000;// 每秒限制数量private static long maxCount = 2;//累加器private static AtomicLong accumulator = new AtomicLong();// 计数判断, 是否超出限制public synchronized static boolean tryAcquire() {if ((System.currentTimeMillis() - startTime) > interval) {log.inf("窗口重置")accumulator.set(0);startTime = System.currentTimeMillis();}return accumulator.incrementAndGet() <= maxCount;}public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 10; i++) {Thread.sleep(250);LocalTime now = LocalTime.now();if (!tryAcquire()) {System.out.println(now + " 被限流");} else {System.out.println(now + " 做点什么");}}}
}

问题

从输出结果中可以看到大概每秒操作 3 次,由于限制 QPS 为 2,所以平均会有一次被限流。看起来可以了,不过我们思考一下就会发现这种简单的限流方式是有问题的,虽然我们限制了 QPS 为 2,但是当遇到时间窗口的临界突变时,如 1s 中的后 500 ms 和第 2s 的前 500ms 时,虽然是加起来是 1s 时间,却可以被请求 4 次

在这里插入图片描述

3、滑动窗口算法

滑动窗口算法是对固定窗口算法的改进。既然固定窗口算法在遇到时间窗口的临界突变时会有问题,那么我们在遇到下一个时间窗口前也调整时间窗口不就可以了吗?

在这里插入图片描述
上图的示例中,每 500ms 滑动一次窗口,可以发现窗口滑动的间隔越短,时间窗口的临界突变问题发生的概率也就越小,不过只要有时间窗口的存在,还是有可能发生时间窗口的临界突变问题。,如果样本窗口定义的合理够小,基本是不会出现临界突破问题。

实现

import java.time.LocalTime;
import java.util.concurrent.atomic.AtomicInteger;/*** 滑动窗口限流工具类*/
public class RateLimiterSlidingWindow {/*** 阈值*/private int qps = 2;/*** 时间窗口总大小(毫秒)*/private long windowSize = 1000;/*** 多少个子窗口*/private Integer windowCount = 10;/*** 窗口列表*/private WindowInfo[] windowArray = new WindowInfo[windowCount];public RateLimiterSlidingWindow(int qps) {this.qps = qps;long currentTimeMillis = System.currentTimeMillis();for (int i = 0; i < windowArray.length; i++) {windowArray[i] = new WindowInfo(currentTimeMillis, new AtomicInteger(0));}}/*** 1. 计算当前时间窗口* 2. 更新当前窗口计数 & 重置过期窗口计数* 3. 当前 QPS 是否超过限制** @return*/public synchronized boolean tryAcquire() {long currentTimeMillis = System.currentTimeMillis();// 1. 计算当前时间窗口int currentIndex = (int)(currentTimeMillis % windowSize / (windowSize / windowCount));// 2.  更新当前窗口计数 & 重置过期窗口计数int sum = 0;for (int i = 0; i < windowArray.length; i++) {WindowInfo windowInfo = windowArray[i];if ((currentTimeMillis - windowInfo.getTime()) > windowSize) {windowInfo.getNumber().set(0);windowInfo.setTime(currentTimeMillis);}if (currentIndex == i && windowInfo.getNumber().get() < qps) {windowInfo.getNumber().incrementAndGet();}sum = sum + windowInfo.getNumber().get();}// 3. 当前 QPS 是否超过限制return sum <= qps;}private class WindowInfo {// 窗口开始时间private Long time;// 计数器private AtomicInteger number;public WindowInfo(long time, AtomicInteger number) {this.time = time;this.number = number;}// get...set...}
}

测试用例

public static void main(String[] args) throws InterruptedException {int qps = 2, count = 20, sleep = 300, success = count * sleep / 1000 * qps;System.out.println(String.format("当前QPS限制为:%d,当前测试次数:%d,间隔:%dms,预计成功次数:%d", qps, count, sleep, success));success = 0;RateLimiterSlidingWindow myRateLimiter = new RateLimiterSlidingWindow(qps);for (int i = 0; i < count; i++) {Thread.sleep(sleep);if (myRateLimiter.tryAcquire()) {success++;if (success % qps == 0) {System.out.println(LocalTime.now() + ": success, ");} else {System.out.print(LocalTime.now() + ": success, ");}} else {System.out.println(LocalTime.now() + ": fail");}}System.out.println();System.out.println("实际测试成功次数:" + success);
}

这种方式没有了时间窗口突变的问题,限流比较准确,但是因为要记录下每次请求的时间点,所以占用的内存较多。

4、漏桶算法

漏桶算法限流的基本原理为:水(对应请求)从进水口进入到漏桶里,漏桶以一定的速度出水(请求放行),当水流入速度过大,桶内的总水量大于桶容量会直接溢出,请求被拒绝,如图所示。

大致的漏桶限流规则如下
(1)进水口(对应客户端请求)以任意速率流入进入漏桶。
(2)漏桶的容量是固定的,出水(放行)速率也是固定的。
(3)漏桶容量是不变的,如果处理速度太慢,桶内水量会超出了桶的容量,则后面流入的水滴会溢出,表示请求拒绝。

在这里插入图片描述
漏桶算法其实很简单,可以粗略的认为就是注水漏水过程,往桶中以任意速率流入水,以一定速率流出水,当水超过桶容量(capacity)则丢弃,因为桶容量是不变的,保证了整体的速率。
在这里插入图片描述

实现

// 漏桶 限流
@Slf4j
public class LeakBucketLimiter {// 计算的起始时间private static long lastOutTime = System.currentTimeMillis();// 流出速率 每秒 2 次private static int leakRate = 2;// 桶的容量private static int capacity = 2;//剩余的水量private static AtomicInteger water = new AtomicInteger(0);//返回值说明:// false 没有被限制到// true 被限流public static synchronized boolean isLimit() {// 如果是空桶,就当前时间作为漏出的时间if (water.get() == 0) {lastOutTime = System.currentTimeMillis();water.addAndGet(1);return false;}// 执行漏水 2/s (一般情况下 waterLeaked小于桶容量才行)int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 1000)) * leakRate;// 计算剩余水量int waterLeft = water.get() - waterLeaked;water.set(Math.max(0, waterLeft));// 重新更新leakTimeStamplastOutTime = System.currentTimeMillis();// 尝试加水,并且水还未满 ,放行if ((water.get()) < capacity) {water.addAndGet(1);return false;} else {// 水满,拒绝加水, 限流return true;}}
}

漏桶出口的速度固定,不能灵活的应对后端能力提升。比如,通过动态扩容,后端流量从1000QPS提升到1WQPS,漏桶没有办法。但是一定时间内,流出速度(处理速度是固定的),流量整形,避免服务被冲垮

5、令牌桶算法

令牌桶算法以一个设定的速率产生令牌并放入令牌桶,每次用户请求都得申请令牌,如果令牌不足,则拒绝请求。

令牌桶算法中新请求到来时会从桶里拿走一个令牌,如果桶内没有令牌可拿,就拒绝服务。当然,令牌的数量也是有上限的。令牌的数量与时间和发放速率强相关,时间流逝的时间越长,会不断往桶里加入越多的令牌,如果令牌发放的速度比申请速度快,令牌桶会放满令牌,直到令牌占满整个令牌桶,如图所示。

令牌桶限流大致的规则如下
(1)进水口按照某个速度,向桶中放入令牌。
(2)令牌的容量是固定的,但是放行的速度不是固定的,只要桶中还有剩余令牌,一旦请求过来就能申请成功,然后放行。
(3)如果令牌的发放速度,慢于请求到来速度,桶内就无牌可领,请求就会被拒绝。

总之,令牌的发送速率可以设置,从而可以对突发的出口流量进行有效的应对。

在这里插入图片描述
在这里插入图片描述

// 令牌桶 限速
@Slf4j
public class TokenBucketLimiter {// 上一次令牌发放时间public long lastTime = System.currentTimeMillis();// 桶的容量public int capacity = 2;// 令牌生成速度 /spublic int rate = 2;// 当前令牌数量public AtomicInteger tokens = new AtomicInteger(0);;//返回值说明:// false 没有被限制到// true 被限流public synchronized boolean isLimited() {long now = System.currentTimeMillis();//时间间隔,单位为 mslong gap = now - lastTime;//计算时间段内的令牌数int reverse_permits = (int) (gap * rate / 1000);int all_permits = tokens.get() + reverse_permits;// 当前令牌数tokens.set(Math.min(capacity, all_permits));if (tokens.get() < applyCount) {// 若拿不到令牌,则拒绝return true;} else {// 还有令牌,领取令牌tokens.getAndAdd( - applyCount);lastTime = now;// log.info("剩余令牌.." + tokens);return false;}}
}

6、分布式流控

高性能的分布式限流组件可以使用Redis+Lua来开发,京东的抢购就是使用Redis+Lua完成的限流。并且无论是Nginx外部网关还是gateway内部网关,都可以使用Redis+Lua限流组件。

定义注解

/*** @description 自定义注解实现分布式限流*/
@Target(value = ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLimit {/*** 请求限制,一秒内可以允许好多个进入(默认一秒可以支持100个)* @return*/int reqLimit() default 1000;/*** 模块名称* @return*/String reqName() default "";
}

注解实现

/*** @description MyRedisLimiter注解的切面类*/
@Aspect
@Component
public class RedisLimiterAspect {private final Logger logger = LoggerFactory.getLogger(RedisLimitStream.class);private static String EXPIRE_TIME = "1";/*** 当前响应请求*/@Autowiredprivate HttpServletResponse response;/*** redis服务*/@Autowiredprivate RedisService redisService;/*** 执行redis的脚本文件*/@Autowiredprivate RedisScript<Boolean> rateLimitLua;/*** 对所有接口进行拦截*/@Pointcut("@annotation(xxx.RedisLimit)")public void pointcut(){}/*** 对切点进行继续处理*/@Around("pointcut()")public Object process(ProceedingJoinPoint proceedingJoinPoint) throws Throwable{//使用反射获取RedisLimit注解MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();//没有添加限流注解的方法直接放行RedisLimit redisLimit = signature.getMethod().getDeclaredAnnotation(RedisLimit.class);if(ObjectUtils.isEmpty(redisLimit)){return proceedingJoinPoint.proceed();}//List设置Lua的KEYS[1]List<String> keyList = new ArrayList<>();// 当前秒数作为 keykeyList.add("ip:" + (System.currentTimeMillis() / 1000));//获取注解上的参数,获取配置的速率//List设置Lua的ARGV[1]int value = redisLimit.reqLimit();// 调用Redis执行lua脚本,未拿到令牌的,直接返回提示boolean acquired = redisService.execute(rateLimitLua, keyList, value,EXPIRE_TIME);logger.info("执行lua结果:" + acquired);if(!acquired){this.limitStreamBackMsg();return null;}//获取到令牌,继续向下执行return proceedingJoinPoint.proceed();}/*** 被拦截的人,提示消息*/private void limitStreamBackMsg() {response.setHeader("Content-Type", "text/html;charset=UTF8");PrintWriter writer = null;try {writer = response.getWriter();writer.println("{\"code\":503,\"message\":\"当前排队人较多,请稍后再试!\",\"data\":\"null\"}");writer.flush();} catch (Exception e) {e.printStackTrace();} finally {if (writer != null) {writer.close();}}}
}

定义redis配置文件

/*** @description 实现redis的编码方式*/
@Configuration
public class RedisConfiguration {/*** 初始化将lua脚本加载到redis脚本中* @return*/@Beanpublic DefaultRedisScript loadRedisScript() {DefaultRedisScript redisScript = new DefaultRedisScript();redisScript.setLocation(new ClassPathResource("limit.lua"));redisScript.setResultType(Boolean.class);return redisScript;}
}

定义固定窗口 lua脚本

local count = redis.call("incr",KEYS[1])
if count == 1 thenredis.call('expire',KEYS[1],ARGV[2])
end
if count > tonumber(ARGV[1]) thenreturn 0
end
return 1

封装方法调用

  /*** 执行lua脚本* @param redisScript lua源代码脚本* @param keyList* @param value* @return*/public boolean execute(RedisScript<Boolean> redisScript, List<String> keyList, int value,,String expireTime) {return redisTemplate.execute(redisScript, keyList, String.valueOf(value),expireTime);}

7、lua脚本

7.1、固定窗口lua脚本

Redis 中的固定窗口限流是使用 incr 命令实现的,incr 命令通常用来自增计数;如果我们使用时间戳信息作为 key,自然就可以统计每秒的请求量了,以此达到限流目的。

这里有两点要注意。

  • 对于不存在的 key,第一次新增时,value 始终为 1。
  • INCR 和 EXPIRE 命令操作应该在一个原子操作中提交,以保证每个 key 都正确设置了过期时间,不然会有 key 值无法自动删除而导致的内存溢出。

由于 Redis 中实现事务的复杂性,所以这里直接只用 lua 脚本来实现原子操作。下面是 lua 脚本内容。

local count = redis.call("incr",KEYS[1])
if count == 1 thenredis.call('expire',KEYS[1],ARGV[2])
end
if count > tonumber(ARGV[1]) thenreturn 0
end
return 1

7.1、滑动窗口lua脚本

通过对上面的基于 incr 命令实现的 Redis 限流方式的测试,我们已经发现了固定窗口限流所带来的问题,在这篇文章的第三部分已经介绍了滑动窗口限流的优势,它可以大幅度降低因为窗口临界突变带来的问题,那么如何使用 Redis 来实现滑动窗口限流呢?

这里主要使用 ZSET 有序集合来实现滑动窗口限流,ZSET 集合有下面几个特点:

  • ZSET 集合中的 key 值可以自动排序。
  • ZSET 集合中的 value 不能有重复值。
  • ZSET 集合可以方便的使用 ZCARD 命令获取元素个数。
  • ZSET 集合可以方便的使用 ZREMRANGEBYLEX 命令移除指定范围的 key 值。

基于上面的四点特性,可以编写出基于 ZSET 的滑动窗口限流 lua 脚本。

--KEYS[1]: 限流 key
--ARGV[1]: 时间戳 - 时间窗口
--ARGV[2]: 当前时间戳(作为score)
--ARGV[3]: 阈值
--ARGV[4]: score 对应的唯一value
-- 1. 移除时间窗口之前的数据
redis.call('zremrangeByScore', KEYS[1], 0, ARGV[1])
-- 2. 统计当前元素数量
local res = redis.call('zcard', KEYS[1])
-- 3. 是否超过阈值
if (res == nil) or (res < tonumber(ARGV[3])) thenredis.call('zadd', KEYS[1], ARGV[2], ARGV[4])return 1
elsereturn 0
end

不想自己实现,可以使用redision,Guava

参考
https://www.cnblogs.com/crazymakercircle/p/15187184.html
https://www.jb51.net/article/256542.htm
https://mp.weixin.qq.com/s/xFtQTueVEd1snfYNRoVlmA


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部