前段时间在使用 springcloud gateway 的时候,发现 gateway 自带了限流功能,下面先介绍如何使用

使用方法

gateway提供了以下几种限流方式

1. 根据IP限流
1
2
3
4
@Bean
public KeyResolver ipKeyResolver() {
return exchange -> Mono.just(Objects.requireNonNull(exchange.getRequest() .getRemoteAddress()).getHostName());
}
2. 参数限流(这里演示为以 userId 参数限流)
1
2
3
4
@Bean
public KeyResolver userKeyResolver() {
return exchange -> Mono.just(Objects.requireNonNull(exchange.getRequest() .getQueryParams().getFirst("userId")));
}
3.请求地址限流
1
2
3
4
@Bean
public KeyResolver apiKeyResolver() {
return exchange -> Mono.just(exchange.getRequest().getPath().value());
}
4. 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
- id: user-server
uri: lb://user-server
order: 0
predicates:
- Path=/user/**
filters:
- StripPrefix=1
- name: RequestRateLimiter
args:
# 允许用户每秒处理多少个请求
redis-rate-limiter.replenishRate: 10
# 令牌桶的容量,允许在一秒钟内完成的最大请求数
redis-rate-limiter.burstCapacity: 20
key-resolver: "#{@ipKeyResolver}"

源码分析

先上Java代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* routeId:路由ID
* id:redis 键值组成部分,根据限流策略生成的ID
*/
public Mono<Response> isAllowed(String routeId, String id) {
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
} else {
// 通过路由ID获取 路由配置
RedisRateLimiter.Config routeConfig = this.loadConfiguration(routeId);
// 令牌桶每秒填充平均速率
int replenishRate = routeConfig.getReplenishRate();
// 每秒向令牌桶中发放的 令牌数量,一秒内的最大请求数量
int burstCapacity = routeConfig.getBurstCapacity();

try {
// 初始化键值
// request_rate_limiter.{id}.tokens
// request_rate_limiter.{id}.timestamp
List<String> keys = getKeys(id);

// 初始化参数 ==> 用户每秒限流数,令牌桶容量,秒级时间戳,“1”
List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", "1");
// 执行 LUA 脚本
Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
return flux.onErrorResume((throwable) -> {
return Flux.just(Arrays.asList(1L, -1L));
}).reduce(new ArrayList(), (longs, l) -> {
longs.addAll(l);
return longs;
}).map((results) -> {
boolean allowed = (Long)results.get(0) == 1L;
Long tokensLeft = (Long)results.get(1);
Response response = new Response(allowed, this.getHeaders(routeConfig, tokensLeft));
if (this.log.isDebugEnabled()) {
this.log.debug("response: " + response);
}

return response;
});
} catch (Exception var9) {
this.log.error("Error determining if user allowed from redis", var9);
return Mono.just(new Response(true, this.getHeaders(routeConfig, -1L)));
}
}
}

接下来是重头戏,request_rate_limiter.lua:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
-- request_rate_limiter.{id}.tokens
local tokens_key = KEYS[1]

-- request_rate_limiter.{id}.timestamp
local timestamp_key = KEYS[2]

-- 令牌桶每秒填充平均速率
local rate = tonumber(ARGV[1])
-- 令牌桶最大容量
local capacity = tonumber(ARGV[2])
-- 秒级时间戳
local now = tonumber(ARGV[3])
-- 1
local requested = tonumber(ARGV[4])

-- 填充时间(无请求时填充满桶的时间)
local fill_time = capacity/rate
-- 过期时间为两个填充时间
local ttl = math.floor(fill_time*2)

-- 令牌桶中剩余的令牌数量
local last_tokens = tonumber(redis.call("get", tokens_key))
-- 若桶为空, 说明在令牌桶的过期时间内没有请求,因此将桶中的令牌填满
if last_tokens == nil then
last_tokens = capacity
end

-- 上一次请求的时间戳
local last_refreshed = tonumber(redis.call("get", timestamp_key))
-- 如果上一次请求的时间戳为nil,则说明此次请求为第一次请求或者上次请求间隔过久导致令牌桶过期
if last_refreshed == nil then
last_refreshed = 0
end

-- 与上一次请求的时间差
local delta = math.max(0, now-last_refreshed)

-- 令牌桶填充后应该有的令牌数量
-- 当redis中没有桶,则填充的令牌数量为 capacity(最大容量)
-- 桶存在时,根据速率和时间差计算要填充的数量(delta*rate),与剩余的令牌数量相加得到填充后的令牌数量
-- 填充后的数量与最大容量取最小值,防止令牌数量溢出
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))

-- 如果填充后令牌数量还是小于1,则说明该请求被限流,拦截
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
-- 未被拦截,令牌数量减一
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end

redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)

-- 返回通过的请求数量(1),令牌桶中剩余的令牌数量
return { allowed_num, new_tokens }

大致的流程和原理都写在了注释当中。

为什么是LUA

为什么用LUA脚本,而不在Java代码中操作Redis,或者使用Python?

因为Redis对于LUA脚本有独特的支持,使用LUA脚本能够保证脚本中的操作原子性执行,其它操作无法穿插其中。而其它语言目前无法实现Redis多个操作的原子性。