系统存在服务上限,流量超过服务上限会导致系统卡死、崩溃。
限流:为了在高并发时系统稳定可用,牺牲或延迟部分请求流量以保证系统整体服务可用。成都创新互联公司专业为企业提供海西网站建设、海西做网站、海西网站设计、海西网站制作等企业网站建设、网页设计与制作、海西企业网站模板建站服务,10余年海西做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。
限流算法
漏桶和令牌桶对比
相关阅读:
- 利用 Spring 拦截器实现
- 使用方式:Controller 方法或类加上限流注解,请求到达拦截器时进行拦截处理
- 使用 Redis 记录数据,Lua 保证多个命令原子性执行。
使用示例
@RestController
@RequestMapping("/ratelimit/custom")
@RateLimit(threshold = 10, rateLimiter = RateLimiterEnum.FIXED_WINDOW, time = 10, timeUnit = TimeUnit.SECONDS)
public class RateLimitController {
@GetMapping("/fixed/window")
@RateLimit(threshold = 10, rateLimiter = RateLimiterEnum.FIXED_WINDOW, time = 10, timeUnit = TimeUnit.SECONDS)
public ResponseResult fixedWindow(Long id) {
id += RandomUtil.randomLong();
log.info("custom:fixedWindow:{}", id);
return ResponseResult.success("custom:fixedWindow:" + id);
}
}
限流注解 RateLimit.java
key() + keyType()
rateLimiter()
threshold()
time() + timeUnit()
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (!(handler instanceof HandlerMethod)) {
return true;
}
HandlerMethod handlerMethod = ((HandlerMethod) handler);
// 从方法和类上获取注解
RateLimit annotation = AspectUtil.findMethodOrClassAnnotation(handlerMethod.getMethod(),
RateLimit.class);
if (annotation == null) {
return true;
}
AspectKeyTypeEnum.KeyTypeData data = AspectKeyTypeEnum.KeyTypeData.builder()
.prefix("rate:limit").key(annotation.key()).build();
String limitKey = annotation.keyType()
.obtainTypeKey(handlerMethod.getMethod(), handlerMethod.getMethodParameters(), data);
RateLimiterEnum limiterEnum = annotation.rateLimiter();
// 执行限流脚本
Long isLimit = redisUtil.execute(limiterEnum.obtainScript(),
Lists.newArrayList(limitKey), limiterEnum.obtainArgvs(annotation).toArray());
if (isLimit != null && isLimit != 0L) {
return true;
}
throw new ResponseException(ResponseEnum.RATE_LIMITED);
}
限流算法 lua 脚本
```lua
-- 限流key ,string 保存调用限流的次数
local key = KEYS[1];
-- 最大访问量
local capacity = tonumber(ARGV[1]);
-- 限流时长(毫秒)
local ttl = tonumber(ARGV[2]);
local count = redis.call('INCR', key);
if (count == 1) then
-- 首次访问设置过期时间
redis.call('PEXPIRE', key, ttl);
end
local res = 0;
if (count <= capacity) then
res = 1;
end
-- 被限流返回0,未被限流返回1
return res;
```
```lua
-- 限流 key , zset 保存未被限流的 id 与时间戳
local key = KEYS[1];
-- 最大访问量
local capacity = tonumber(ARGV[1]);
-- 限流时长(毫秒)
local ttl = tonumber(ARGV[2]);
-- 当前时间戳(毫秒)
local now = tonumber(ARGV[3]);
-- 唯一ID
local ukid = ARGV[4];
-- 清除过期的数据
redis.call('ZREMRANGEBYSCORE', key, 0, now - ttl);
local count = redis.call('ZCARD', key);
local res = 0;
if (count < capacity) then
-- 往 zset 中添加一个值、得分均为当前时间戳的元素,[value,score]
redis.call("ZADD", key, now, ukid);
-- 重置 zset 的过期时间,单位毫秒
redis.call("PEXPIRE", key, ttl);
res = 1;
end
-- 被限流返回0,未被限流返回1
return res;
```
```lua
-- 限流 key , hash 保存限流相关信息
local key = KEYS[1];
-- 最大访问量
local capacity = tonumber(ARGV[1]);
-- 限流时长(毫秒)
local ttl = tonumber(ARGV[2]);
-- 当前时间戳(毫秒)
local now = tonumber(ARGV[3]);
-- 水流出速率(每毫秒)
local rate = tonumber(ARGV[4]);
-- 限流信息
local info = redis.call("HMGET", key, "last_time", "stored_water");
-- 上次处理时间
local last_time = tonumber(info[1]);
-- 当前存储的水量,默认为0,存在保存值使用保存值
local stored_water = tonumber(info[2]);
if (stored_water == nil) then
stored_water = 0;
end
if (last_time ~= nil) then
-- 根据上次处理时间和当前时间差,计算流出后的水量
local leaked_water = math.floor((now - last_time) * rate);
stored_water = math.max(stored_water - leaked_water, 0);
if (leaked_water > 0) then
last_time = nil;
end
end
-- 首次访问、泄露了水 设置上次处理时间
if (last_time == nil) then
redis.call("HSET", key, "last_time", now);
end
-- 被限流返回0,未被限流返回1
local res = 0;
if (capacity > stored_water) then
redis.call("HSET", key, "stored_water", stored_water + 1);
res = 1;
end
redis.call("PEXPIRE", key, ttl);
return res;
```
```lua
-- 限流 key , hash 保存限流相关信息
local key = KEYS[1];
-- 最大访问量
local capacity = tonumber(ARGV[1]);
-- 限流时长(毫秒)
local ttl = tonumber(ARGV[2]);
-- 当前时间戳(毫秒)
local now = tonumber(ARGV[3]);
-- 生成令牌速率(每毫秒)
local rate = tonumber(ARGV[4]);
-- 限流信息
local info = redis.call("HMGET", key, "last_time", "stored_tokens");
-- 上次处理时间
local last_time = tonumber(info[1]);
-- 令牌数量,默认为最大访问量,存在保存值使用保存值
local stored_tokens = tonumber(info[2]);
if (stored_tokens == nil) then
stored_tokens = capacity;
end
if (last_time ~= nil) then
-- 根据上次处理时间和当前时间差,触发式往桶里添加令牌
local add_tokens = math.floor((now - last_time) * rate);
stored_tokens = math.min(add_tokens + stored_tokens, capacity);
if (add_tokens > 0) then
last_time = nil;
end
end
-- 首次访问、添加了令牌 设置上次处理时间
if (last_time == nil) then
redis.call("HSET", key, "last_time", now);
end
-- 被限流返回0,未被限流返回1
local res = 0;
if (stored_tokens > 0) then
redis.call("HSET", key, "stored_tokens", stored_tokens - 1);
res = 1;
end
redis.call("PEXPIRE", key, ttl);
return res;
```
demo 地址:https://github.com/EastX/java-practice-demos/tree/main/demo-ratelimit
推荐阅读: