参考文章:


限流

1. 为什么要进行限流

  1. 瞬时流量过高,服务被压垮
  2. 恶意用户高频光顾,导致服务器宕机?
  3. 消息消费过快,导致数据库压力过大,性能下降甚至崩溃?

2. 什么是限流

限流,也称流量控制。是指系统在面临高并发,或者大流量请求的情况下,限制新的请求对系统的访问,从而保证系统的稳定性。限流会导致部分用户请求处理不及时或者被拒,这就影响了用户体验。所以一般需要在系统稳定和用户体验之间平衡一下。举个生活的例子:

一些热门的旅游景区,一般会对每日的旅游参观人数有限制的。每天只会卖出固定数目的门票,比如5000张。假设在五一、国庆假期,你去晚了,可能当天的票就已经卖完了,就无法进去游玩了。即使你进去了,排队也能排到你怀疑人生。


3. 常见的限流算法

3.1 固定窗口限流算法

首先维护一个计数器,将单位时间段当做一个窗口,计数器记录这个窗口接收请求的次数。

  • 当次数少于限流阀值,就允许访问,并且计数器+1
  • 当次数大于限流阀值,就拒绝访问。
  • 当前的时间窗口过去之后,计数器清零。

假设单位时间是1秒,限流阀值为3。在单位时间1秒内,每来一个请求,计数器就加1,如果计数器累加的次数超过限流阀值3,后续的请求全部拒绝。等到1s结束后,计数器清0,重新开始计数。如下图:

pkhFua4.webp

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class FixedWindowRateLimiter {
Logger logger = LoggerFactory.getLogger(FixedWindowRateLimiter.class);
//时间窗口大小,单位毫秒
long windowSize;
//允许通过的请求数
int maxRequestCount;
//当前窗口通过的请求数
AtomicInteger counter = new AtomicInteger(0);
//窗口右边界
long windowBorder;
public FixedWindowRateLimiter(long windowSize, int maxRequestCount) {
this.windowSize = windowSize;
this.maxRequestCount = maxRequestCount;
this.windowBorder = System.currentTimeMillis() + windowSize;
}
public synchronized boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
if (windowBorder < currentTime) {
logger.info("window reset");
do {
windowBorder += windowSize;
} while (windowBorder < currentTime);
counter = new AtomicInteger(0);
}


if (counter.intValue() < maxRequestCount) {
counter.incrementAndGet();
logger.info("tryAcquire success");
return true;
} else {
logger.info("tryAcquire fail");
return false;
}
}
}

优缺点:

优点:实现简单,容易理解

缺点:

  1. 限流不够平滑。例如:限流是每秒3个,在第一毫秒发送了3个请求,达到限流,窗口剩余时间的请求都将会被拒绝,体验不好。

  2. 无法处理窗口边界问题。因为是在某个时间窗口内进行流量控制,所以可能会出现窗口边界效应,即在时间窗口的边界处可能会有大量的请求被允许通过,从而导致突发流量。即:如果第2到3秒内产生了150次请求,而第3到4秒内产生了150次请求,那么其实在第2秒到第4秒这两秒内,就已经发生了300次请求了,远远大于我们要求的3秒内的请求不要超过150次这个限制,如下图所示:

pkhFlGR.webp


3.2 滑动窗口限流算法

滑动窗口限流解决固定窗口临界值的问题。它将单位时间周期分为n个小周期,分别记录每个小周期内接口的访问次数,并且根据时间滑动删除过期的小周期。

一张图解释滑动窗口算法,如下:

pkhF3xx.webp

假设单位时间还是1s,滑动窗口算法把它划分为5个小周期,也就是滑动窗口(单位时间)被划分为5个小格子。每格表示0.2s。每过0.2s,时间窗口就会往右滑动一格。然后呢,每个小周期,都有自己独立的计数器,如果请求是0.83s到达的,0.8~1.0s对应的计数器就会加1。

我们来看下滑动窗口是如何解决临界问题的?

假设我们1s内的限流阀值还是5个请求,0.8~1.0s内(比如0.9s的时候)来了5个请求,落在黄色格子里。时间过了1.0s这个点之后,又来5个请求,落在紫色格子里。如果是固定窗口算法,是不会被限流的,但是滑动窗口的话,每过一个小周期,它会右移一个小格。过了1.0s这个点后,会右移一小格,当前的单位时间段是0.2~1.2s,这个区域的请求已经超过限定的5了,已触发限流啦,实际上,紫色格子的请求都被拒绝啦。

TIPS: 当滑动窗口的格子周期划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

public class SlidingWindowRateLimiter {
Logger logger = LoggerFactory.getLogger(FixedWindowRateLimiter.class);
//时间窗口大小,单位毫秒
long windowSize;
//分片窗口数
int shardNum;
//允许通过的请求数
int maxRequestCount;
//各个窗口内请求计数
int[] shardRequestCount;
//请求总数
int totalCount;
//当前窗口下标
int shardId;
//每个小窗口大小,毫秒
long tinyWindowSize;
//窗口右边界
long windowBorder;


public SlidingWindowRateLimiter(long windowSize, int shardNum, int maxRequestCount) {
this.windowSize = windowSize;
this.shardNum = shardNum;
this.maxRequestCount = maxRequestCount;
this.shardRequestCount = new int[shardNum];
this.tinyWindowSize = windowSize / shardNum;
this.windowBorder = System.currentTimeMillis();
}
public synchronized boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
if (windowBorder < currentTime) {
logger.info("window reset");
do {
shardId = (++shardId) % shardNum;
totalCount -= shardRequestCount[shardId];
shardRequestCount[shardId] = 0;
windowBorder += tinyWindowSize;
} while (windowBorder < currentTime);
}


if (totalCount < maxRequestCount) {
logger.info("tryAcquire success:{}", shardId);
shardRequestCount[shardId]++;
totalCount++;
return true;
} else {
logger.info("tryAcquire fail");
return false;
}
}
}

优缺点

优点:解决了固定窗口算法的窗口边界问题,避免突发流量压垮服务器。

缺点:还是存在限流不够平滑的问题。例如:限流是每秒3个,在第一毫秒发送了3个请求,达到限流,剩余窗口时间的请求都将会被拒绝,体验不好。

3.3 漏桶算法

实现原理

它的原理很简单,可以认为就是注水漏水的过程。往漏桶中以任意速率流入水,以固定的速率流出水。当水超过桶的容量时,会被溢出,也就是被丢弃。因为桶容量是不变的,保证了整体的速率。

pkhFJsK.webp

  • 流入的水滴,可以看作是访问系统的请求,这个流入速率是不确定的。
  • 桶的容量一般表示系统所能处理的请求数。
  • 如果桶的容量满了,就达到限流的阀值,就会丢弃水滴(拒绝请求)
  • 流出的水滴,是恒定过滤的,对应服务按照固定的速率处理请求。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class LeakyBucketRateLimiter {
Logger logger = LoggerFactory.getLogger(LeakyBucketRateLimiter.class);
//桶的容量
int capacity;
//桶中现存水量
AtomicInteger water = new AtomicInteger();
//开始漏水时间
long leakTimestamp;
//水流出的速率,即每秒允许通过的请求数
int leakRate;


public LeakyBucketRateLimiter(int capacity, int leakRate) {
this.capacity = capacity;
this.leakRate = leakRate;
}


public synchronized boolean tryAcquire() {
//桶中没有水, 重新开始计算
if (water.get() == 0) {
logger.info("start leaking");
leakTimestamp = System.currentTimeMillis();
water.incrementAndGet();
return water.get() < capacity;
}
//先漏水,计算剩余水量
long currentTime = System.currentTimeMillis();
int leakedWater = (int) ((currentTime - leakTimestamp) / 1000 * leakRate);
logger.info("lastTime:{}, currentTime:{}. LeakedWater:{}", leakTimestamp, currentTime, leakedWater);
//可能时间不足,则先不漏水
if (leakedWater != 0) {
int leftWater = water.get() - leakedWater;
//可能水已漏光。设为0
water.set(Math.max(0, leftWater));
leakTimestamp = System.currentTimeMillis();
}
logger.info("剩余容量:{}", capacity - water.get());
if (water.get() < capacity) {
logger.info("tryAcquire sucess");
water.incrementAndGet();
return true;
} else {
logger.info("tryAcquire fail");
return false;
}
}
}

优缺点

优点:

  1. 平滑流量。由于漏桶算法以固定的速率处理请求,可以有效地平滑和整形流量,避免流量的突发和波动(类似于消息队列的削峰填谷的作用)。

  2. 防止过载。当流入的请求超过桶的容量时,可以直接丢弃请求,防止系统过载。

缺点:

  1. 无法处理突发流量:由于漏桶的出口速度是固定的,无法处理突发流量。例如,即使在流量较小的时候,也无法以更快的速度处理请求。

  2. 可能会丢失数据:如果入口流量过大,超过了桶的容量,那么就需要丢弃部分请求。在一些不能接受丢失请求的场景中,这可能是一个问题。

  3. 不适合速率变化大的场景:如果速率变化大,或者需要动态调整速率,那么漏桶算法就无法满足需求。

  4. 资源利用率:不管当前系统的负载压力如何,所有请求都得进行排队,即使此时服务器的负载处于相对空闲的状态,这样会造成系统资源的浪费。

由于漏桶的缺陷比较明显,所以在实际业务场景中,使用的比较少。

3.4 令牌桶算法

实现原理

  • 有一个令牌管理员,根据限流大小,定速往令牌桶里放令牌。
  • 如果令牌数量满了,超过令牌桶容量的限制,那就丢弃。
  • 系统在接受到一个用户请求时,都会先去令牌桶要一个令牌。如果拿到令牌,那么就处理这个请求的业务逻辑;
  • 如果拿不到令牌,就直接拒绝这个请求。

pkhFUde.webp

代码实现

Guava中的RateLimiter就是基于令牌桶实现的,可以直接拿来使用。

简单的固定产生令牌实现

1
2
3
4
5
6
7
8
9
10
11

@Test
public void acquireTest() {
//每秒固定生成5个令牌
RateLimiter rateLimiter = RateLimiter.create(5);
for (int i = 0; i < 10; i++) {
double time = rateLimiter.acquire();
logger.info("等待时间:{}s", time);
}
}

优缺点

优点:

  1. 可以处理突发流量:令牌桶算法可以处理突发流量。当桶满时,能够以最大速度处理请求。这对于需要处理突发流量的应用场景非常有用。

  2. 限制平均速率:在长期运行中,数据的传输率会被限制在预定义的平均速率(即生成令牌的速率)。

  3. 灵活性:与漏桶算法相比,令牌桶算法提供了更大的灵活性。例如,可以动态地调整生成令牌的速率。

缺点:

  1. 可能导致过载:如果令牌产生的速度过快,可能会导致大量的突发流量,这可能会使网络或服务过载。

  2. 需要存储空间:令牌桶需要一定的存储空间来保存令牌,可能会导致内存资源的浪费。

  3. 实现稍复杂:相比于计数器算法,令牌桶算法的实现稍微复杂一些。

限流常见的四种实现方案

1. 基于 guava 限流实现(单机版)

guava 为谷歌开源的一个比较实用的组件,利用这个组件可以帮助开发人员完成常规的限流操作,接下来看具体的实现步骤

1、引入依赖:

1
2
3
4
5
6
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
</dependency>
12345

2、自定义限流注解

自定义一个限流用的注解,后面在需要限流的方法或接口上面只需添加该注解即可

1
2
3
4
5
6
7
8
9
10
@Documented
@Target(value = ElementType.METHOD)
@Retention(value = RetentionPolicy.RUNTIME)
public @interface GuavaLimitRateAnnotation {
// 限制类型
String limitType();

// 每秒 5 个请求
double limitCount() default 5d;
}

3、限流 AOP 类

通过AOP前置通知的方式拦截添加了上述自定义限流注解的方法,解析注解中的属性值,并以该属性值作为 guava 提供的限流参数,该类为整个实现的核心所在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Aspect
@Component
public class GuavaLimitRateAspect {

private static Logger logger = LoggerFactory.getLogger(GuavaLimitRateAspect.class);

@Before("execution(@GuavaLimitRateAnnotation * *(..))")
public void limit(JoinPoint joinPoint) {
// 1.获取当前方法
Method currentMethod = getCurrentMethod(joinPoint);
if (Objects.isNull(currentMethod)) {
return;
}
// 2.从方法注解定义上获取限流的类型
String limitType = currentMethod.getAnnotation(GuavaLimitRateAnnotation.class).limitType();
double limitCount = currentMethod.getAnnotation(GuavaLimitRateAnnotation.class).limitCount();
// 3.使用guava的令牌桶算法获取一个令牌,获取不到先等待
RateLimiter rateLimiter = RateLimitHelper.getRateLimiter(limitType, limitCount);
boolean b = rateLimiter.tryAcquire();
if (b) {
System.out.println("获取到令牌");
} else {
HttpServletResponse resp = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getResponse();
JSONObject jsonObject = new JSONObject();
jsonObject.put("success",false);
jsonObject.put("msg","限流中");
try {
output(resp, jsonObject.toJSONString());
} catch (Exception e) {
logger.error("error,e:{}", e);
}
}
}

public void output(HttpServletResponse response, String msg) throws IOException {
response.setContentType("application/json;charset=UTF-8");
ServletOutputStream outputStream = null;
try {
outputStream = response.getOutputStream();
outputStream.write(msg.getBytes("UTF-8"));
} catch (IOException e) {
e.printStackTrace();
} finally {
assert outputStream != null;
outputStream.flush();
outputStream.close();
}
}

private Method getCurrentMethod(JoinPoint joinPoint) {
Method[] methods = joinPoint.getTarget().getClass().getMethods();
Method target = null;
for (Method method : methods) {
if (method.getName().equals(joinPoint.getSignature().getName())) {
target = method;
break;
}
}
return target;
}

}

其中限流的核心 API 即为 RateLimiter 这个对象,涉及到的 RateLimitHelper 类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class RateLimitHelper {

private RateLimitHelper(){}

private static Map<String, RateLimiter> rateMap = new HashMap<>();

public static RateLimiter getRateLimiter(String limitType, double limitCount ){
RateLimiter rateLimiter = rateMap.get(limitType);
if(rateLimiter == null){
rateLimiter = RateLimiter.create(limitCount);
rateMap.put(limitType,rateLimiter);
}
return rateLimiter;
}

}

4、测试

1
2
3
4
5
6
7
8
9
10
11
12
@RestController
@RequestMapping("/limit")
public class LimitController {

@GetMapping("/limitByGuava")
@GuavaLimitRateAnnotation(limitType = "测试限流", limitCount = 1)
public String limitByGuava() {
return "limitByGuava";
}

}
1234567891011

在接口中为了模拟出效果,我们将参数设置的非常小,即QPS为1,可以预想当每秒请求超过1时将会出现被限流的提示,启动工程并验证接口,每秒1次的请求,可以正常得到结果,效果如下:

pkhFbeU.png


2. 基于 sentinel 限流实现(分布式版)

sentinel 通常是需要结合 springcloud-alibaba 框架一起实用的,而且与框架集成之后,可以配合控制台一起使用达到更好的效果,实际上,sentinel 官方也提供了相对原生的 SDK 可供使用,接下来就以这种方式进行整合。

1、引入依赖

1
2
3
4
5
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.8.0</version>
</dependency>

2、自定义限流注解

1
2
3
4
5
6
7
8
9
10
11
12
@Documented
@Target(value = ElementType.METHOD)
@Retention(value = RetentionPolicy.RUNTIME)
public @interface SentinelLimitRateAnnotation {

// 限制类型
String resourceName();

// 每秒 5 个
int limitCount() default 5;

}

3、自定义AOP类实现限流

该类的实现思路与上述使用guava类似,不同的是,这里使用的是sentinel原生的限流相关的API,对此不够属性的可以查阅官方的文档进行学习,这里就不展开来说了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Aspect
@Component
public class SentinelLimitRateAspect {

@Pointcut(value = "@annotation(com.hcr.sbes.limit.sentinel.SentinelLimitRateAnnotation)")
public void rateLimit() {

}

@Around("rateLimit()")
public Object around(ProceedingJoinPoint joinPoint) {
// 1.获取当前方法
Method currentMethod = getCurrentMethod(joinPoint);
if (Objects.isNull(currentMethod)) {
return null;
}
// 2.从方法注解定义上获取限流的类型
String resourceName = currentMethod.getAnnotation(SentinelLimitRateAnnotation.class).resourceName();
if(StringUtils.isEmpty(resourceName)){
throw new RuntimeException("资源名称为空");
}
int limitCount = currentMethod.getAnnotation(SentinelLimitRateAnnotation.class).limitCount();
// 3.初始化规则
initFlowRule(resourceName,limitCount);
Entry entry = null;
Object result = null;
try {
entry = SphU.entry(resourceName);
try {
result = joinPoint.proceed();
} catch (Throwable throwable) {
throwable.printStackTrace();
}
} catch (BlockException ex) {
// 资源访问阻止,被限流或被降级,在此处进行相应的处理操作
System.out.println("blocked");
return "被限流了";
} catch (Exception e) {
Tracer.traceEntry(e, entry);
} finally {
if (entry != null) {
entry.exit();
}
}
return result;
}

private static void initFlowRule(String resourceName,int limitCount) {
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
//设置受保护的资源
rule.setResource(resourceName);
//设置流控规则 QPS
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
//设置受保护的资源阈值
rule.setCount(limitCount);
rules.add(rule);
//加载配置好的规则
FlowRuleManager.loadRules(rules);
}

private Method getCurrentMethod(JoinPoint joinPoint) {
Method[] methods = joinPoint.getTarget().getClass().getMethods();
Method target = null;
for (Method method : methods) {
if (method.getName().equals(joinPoint.getSignature().getName())) {
target = method;
break;
}
}
return target;
}

}

4、测试接口

1
2
3
4
5
@GetMapping("/limitBySentinel")
@SentinelLimitRateAnnotation(resourceName = "测试限流2", limitCount = 1)
public String limitBySentinel() {
return "limitBySentinel";
}

3. 基于redisson限流实现(分布式版)

1. 引入redisson依赖

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.32.0</version>
</dependency>

2. 编写配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.yupi.springbootinit.config;

import lombok.Data;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
// 从application.yml文件中读取前缀为"spring.redis"的配置项
@ConfigurationProperties(prefix = "spring.redis")
@Data
public class RedissonConfig {

private Integer database;

private String host;

private Integer port;
// 如果redis默认没有密码,则不用写
//private String password;

// spring启动时,会自动创建一个RedissonClient对象
@Bean
public RedissonClient getRedissonClient() {
// 1.创建配置对象
Config config = new Config();
// 添加单机Redisson配置
config.useSingleServer()
// 设置数据库
.setDatabase(database)
// 设置redis的地址
.setAddress("redis://" + host + ":" + port);
// 设置redis的密码(redis有密码才设置)
// .setPassword(password);

// 2.创建Redisson实例
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}

3. 编写RedisLimiterManager

编写 RedisLimiterManager:什么是 Manager?专门提供 RedisLimiter 限流基础服务的(提供了通用的能力,可以放到任何一个项目里)。

4. 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.yupi.springbootinit.manager;

import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

import static org.junit.jupiter.api.Assertions.*;

@SpringBootTest
class RedisLimiterManagerTest {

@Resource
private RedisLimiterManager redisLimiterManager;

@Test
void doRateLimit() throws InterruptedException {
// 模拟一下操作
String userId = "1";
// 瞬间执行2次,每成功一次,就打印'成功'
for (int i = 0; i < 2; i++) {
redisLimiterManager.doRateLimit(userId);
System.out.println("成功");
}
// 睡1秒
Thread.sleep(1000);
// 瞬间执行5次,每成功一次,就打印'成功'
for (int i = 0; i < 5; i++) {
redisLimiterManager.doRateLimit(userId);
System.out.println("成功");
}
}
}

4. 基于网关 gateway 限流(分布式版)

实现算法参照上面的令牌桶实现,gateway里面的拦截器编写限流逻辑就可以了!

文章结束!