自己实现限流器(二): Redis+Lua分布式令牌桶
发布时间:
最后更新:
在上一篇里咱写了一个基于令牌桶算法的限流器:https://blog.kaciras.com/article/12/implement-token-bucket-algorithm-from-scratch,写完我就想起来在查资料的过程中经常看到什么“分布式”之类的字眼,虽然我的小博客跟分布式是没有丝毫关系的,但还是手痒痒,毕竟分布式这么高逼格的东西咱也想参合一下。
实现方案 #
要让多个服务共享一个限流器,那么首先要把状态共享,毕竟算法是固定的可以在每个服务里自己算去,状态就是当前令牌数
和最后访问时间
这两个数据。存储数据的服务用Redis是再好不过了,性能高又方便,于是就有了基本的方针:
所有访问者的请求一律通过网关上的限流器检查,相关数据存储在一个Redis数据库里即可。可是这又有个线程安全问题,数据从Redis里取回,经过计算后再把剩余令牌和访问时间存到Redis里,这跟多个线程做i++
运算一样会出现数据不一致的情况。
解决线程安全问题的一个方案是加锁,但是一旦加锁,那就跟分布式高性能等高逼格技术无缘了,所以万万锁不得。幸好Redis提供了一个解决方案:把代码搬进Redis里运行!
在Redis里使用Lua脚本 #
因为Redis是单线程的,所以在它里面抛代码不存在线程安全问题,就像是在同步块里一样。Redis里怎么用Lua详见官方文档 https://redis.io/commands/eval,我这就不说了因为太长。
首先确定逻辑范围,即哪些代码放在Redis里,哪些放在网关里。根据最小锁定原则(同步块里的代码越少越好),毕竟Redis要承载高并发,不能在里面消耗太多时间,Lua脚本应当只包含取数据、计算、更新数据这三个逻辑,其它的像参数检查等逻辑全部放到网关层去做。
根据公式当前令牌数 = min(上次剩余 + 添加速率 * 时间差, 桶容量)
,时间差 = 当前时间 - 上次获取时间
,上次获取时间和令牌的使用Hash结构存储在Redis里,需要传入的参数有:
- requirement: 本次访问所需的令牌数
- now: 当前时间
- bucketSize: 桶容量
- rate: 添加速率
为什么需要传入当前时间?在脚本里获取不行么?
当然不行,Lua里取当前时间的API在os
模块里,而该模块除了获取时间之外还有其它访问系统环境的API,为了安全性就被Redis给移除了,所以时间是取不到的;另外将时间作为参数有利于Mock,在单元测试的时候可以随便模拟经过了多久。
参数传递部分代码:
local requirement = tonumber(ARGV[1])
local now = tonumber(ARGV[2])
local bucketSize = tonumber(ARGV[3])
local rate = tonumber(ARGV[4])
-- time: 最后访问时间,permits: 剩余令牌数
local data = redis.call("HMGET", KEYS[1], "time", "permits")
local lastAcquire = data[1]
local currPermits = data[2]
第二步是计算当前令牌数currPermits
,如果是首次访问lastAcquire
为nli
,则当前令牌数设为桶容量,最后获取时间设为当前时间,否则就计算:
if not lastAcquire then
lastAcquire = now
currPermits = bucketSize
else
currPermits = math.min(bucketSize, currPermits + (now - lastAcquire) * rate)
end
最后把当前令牌数根要获取的数量做比较,如果足够就更新数据,不够就算一下需要等多久,最后更新数据的过期时间,返回需要等待的时间。
local timeToWait = 0
if requirement <= currPermits then
redis.call("HMSET", KEYS[1], "time", now, "permits", currPermits - requirement)
else
--- 注意:Lua 的浮点数直接返回会被 Redis 向下截断成整数导致事件偏小,这里保守起见向上取整
timeToWait = math.ceil((requirement - currPermits) / rate)
end
redis.call("EXPIRE", KEYS[1], tonumber(ARGV[5]))
return timeToWait
在 redis-cli 里使用:
> redis-cli SCRIPT LOAD "$(cat RateLimiter.lua)"
[sha1 value]
> redis-cli
> EVAL[sha1 value] 1 [key] 所需令牌 当前时间 桶容量 速率 键超时
例如访问者IP为192.168.0.1,一次需要8个令牌,桶容量10,每秒添加一个令牌,首次访问5秒后再访问,此时桶里还缺一个令牌:
> redis-cli SCRIPT LOAD "$(cat RateLimiter.lua)"
xxxxx
> redis-cli
> EVALSHA xxxxx 1 192.168.0.1 8 60 10 1 360
0
> EVALSHA xxxxx 1 192.168.0.1 8 65 10 1 360
1
网关层代码 #
本站的后端使用Java + Spring全家桶写的,使用SpringData的RedisTemplate作为Redis的接口,代码很简单就不细说了:
@Setter
public final class RedisTokenBucket implements RateLimiter {
private static final String SCRIPT_FILE = "RateLimiter.lua";
private final Clock clock;
private final RedisTemplate<String, Object> redisTemplate;
private final RedisScript<Long> script;
private int bucketSize;
private double rate;
public RedisTokenBucket(Clock clock, RedisTemplate<String, Object> redisTemplate) {
this.clock = clock;
this.redisTemplate = redisTemplate;
var script = new DefaultRedisScript<Long>();
script.setResultType(Long.class);
script.setLocation(new ClassPathResource(SCRIPT_FILE));
this.script = script;
}
/**
* 获取指定数量的令牌,返回桶内拥有足够令牌所需要等待的时间。
*
* @param id 标识获取者的身份,一般是对方的IP之类的
* @param permits 要获取的令牌数量
* @return 需要等待的时间(秒),0表示成功,小于0表示永远无法完成
*/
public long acquire(@NonNull String id, int permits) {
if (permits == 0) {
return 0;
}
if (permits > bucketSize) {
return -1;
}
var now = clock.instant().getEpochSecond();
var ttl = bucketSize / rate;
var waitTime = redisTemplate.execute(script, List.of(id), permits, now, bucketSize, rate, ttl);
if (waitTime == null) {
throw new RuntimeException("限速脚本返回了空值,ID=" + id);
}
return waitTime;
}
代码非常简单,需要注意的是最后还传入了数据的过期时间ttl
,它就等于桶加满所需的时间: 桶容量 / 添加速率
。
最后 #
经测试脚本的性能相当高,跟Redis内置的命令在同一水平,详见https://github.com/kaciras-blog/java-infra/blob/old-token-bucket/src/test/resources/RateLimiterBenchmark.txt
Lua代码https://github.com/kaciras-blog/java-infra/blob/old-token-bucket/src/main/resources/RateLimiter.lua
注:上述代码是博客项目的一部分,因为我懒,没有单独开一个仓库,而原项目后续可能会有更改,查看本文对应的代码请认准 TAG:old-token-bucket