前言: 上一篇文章介绍了OpenResty的安装和lua的基本语法学习,这一篇介绍一下使用OpenResty进行限流。本文只对限流功能实现做简单介绍,不对限流算法原理做过多解释。
该项目已经实现的限流方法
- 固定窗口(单位时间内接口可以请求的数量)
- 令牌桶
- 计数器(接口并发连接数)
限流流程图
限流规则存储
针对接口限流
限流算法实现主要代码部分
固定窗口
time_range_request_count(rule_detail)
local request_uri = ngx.var.uri
local limitTimeRange = rule_detail.limitTimeRange
local limitNum = rule_detail.limitNum
if not limitTimeRange or not limitNum then
ngx.log(ngx.ERR, "limitTimeRange or limitNum is nil")
return
end
-- 检查 limitTimeRange 和 limitNum 的类型
if type(limitTimeRange) ~= "number" or type(limitNum) ~= "number" then
ngx.log(ngx.ERR, "limitTimeRange 或 limitNum 的类型不是数字","rule_detail: "..cjson.encode(rule_detail))
return
end
local lim, err = limit_count.new("api_rate_limit_req_store", limitNum, limitTimeRange)
if not lim then
ngx.log(ngx.ERR, "创建限流器失败 err: ", err.." rule_detail: "..cjson.encode(rule_detail).." uri: "..request_uri)
--return ngx.exit(500)
baseUtil.api_rate_limit_res(500,500,"服务器异常")
end
local delay, response = lim:incoming(request_uri, true)
if not delay then
if response == "rejected" then
ngx.log(ngx.ERR, "请求数 rejected: "," rule_detail: "..cjson.encode(rule_detail).." uri: "..request_uri)
--return ngx.exit(429)
baseUtil.api_rate_limit_res(429,429,"服务器异常")
end
ngx.log(ngx.ERR, "failed to limit count: ", tostring(response).." rule_detail: "..cjson.encode(rule_detail).." uri: "..request_uri)
--return ngx.exit(500)
baseUtil.api_rate_limit_res(500,500,"服务器异常")
end
end
return {
time_range_request_count = time_range_request_count
}
令牌桶
local limit_req = require "resty.limit.req"
local cjson = require "cjson"
local baseUtil = require "BaseUtil"
-- 令牌桶算法限流
local function token_bucket_algorithm(rule_detail)
local request_uri = ngx.var.uri
-- 限流速率 r/s
local rate = rule_detail.rate
-- 令牌数量
local burst = rule_detail.burst
-- 检查 rate 和 burst 的类型
if type(rate) ~= "number" or type(burst) ~= "number" then
ngx.log(ngx.ERR, "rate 或 burst 的类型不是数字","rule_detail: "..cjson.encode(rule_detail))
return
end
local lim, err = limit_req.new("api_rate_limit_req_store", rate, burst)
if not lim then
ngx.log(ngx.ERR, "令牌桶限流,创建限流器失败", err)
--return ngx.exit(500)
baseUtil.api_rate_limit_res(500,500,"服务器异常")
end
-- 调用限流器
local delay, response = lim:incoming(request_uri, true)
if not delay then
if response == "rejected" then
ngx.log(ngx.ERR, "请求数 rejected: "," rule_detail: "..cjson.encode(rule_detail).." uri: "..request_uri)
--return ngx.exit(429) -- 返回 HTTP 429 状态码
baseUtil.api_rate_limit_res(429,429,"服务器异常")
end
ngx.log(ngx.ERR, "调用限流器失败: ", tostring(response).." rule_detail: "..cjson.encode(rule_detail).." uri: "..request_uri)
--return ngx.exit(500)
baseUtil.api_rate_limit_res(500,500,"服务器异常")
end
-- 如果请求没有超过,正常处理请求
if delay >= 0 then
ngx.sleep(delay)
end
end
return {
token_bucket_algorithm = token_bucket_algorithm
}
并发数
local limit_conn = require "resty.limit.conn"
local cjson = require "cjson"
local baseUtil = require "BaseUtil"
-- 最大并发连接数
local function concurrent_connections(rule_detail)
local key = ngx.var.uri
local concurrentNum = rule_detail.concurrentNum
local concurrentExtraNum = rule_detail.concurrentExtraNum
local concurrentTime = rule_detail.concurrentTime
-- 检查 concurrentNum、concurrentExtraNum、concurrentTime 的类型
if type(concurrentNum) ~= "number" or type(concurrentExtraNum) ~= "number" or type(concurrentTime) ~= "number"then
ngx.log(ngx.ERR, "concurrentNum 或 concurrentExtraNum 或 concurrentTime 的类型不是数字","rule_detail: "..cjson.encode(rule_detail))
return
end
local lim, err = limit_conn.new("api_rate_limit_req_store", concurrentNum, concurrentExtraNum, concurrentTime)
if not lim then
ngx.log(ngx.ERR, "最大并发连接数,创建限流器失败", err)
baseUtil.api_rate_limit_res(500,500,"服务器异常")
end
local delay, response = lim:incoming(key, true)
if not delay then
if response == "rejected" then
ngx.log(ngx.ERR, "请求数 rejected: "," rule_detail: "..cjson.encode(rule_detail).." uri: "..key)
baseUtil.api_rate_limit_res(429,429,"服务器异常")
end
ngx.log(ngx.ERR, "failed to limit req: ", tostring(response).." rule_detail: "..cjson.encode(rule_detail).." uri: "..key)
baseUtil.api_rate_limit_res(500,500,"服务器异常")
end
-- 如果请求没有超过并发,那么将连接信息添加到ngx.ctx中
if lim:is_committed() then
local ctx = ngx.ctx
ctx.limit_type = "concurrent_connections"
ctx.limit_conn = lim
ctx.limit_conn_key = key
ctx.limit_conn_delay = delay
end
if delay >= 0.001 then
ngx.log(ngx.WARN, "delaying conn, excess ", delay, "s per binary_remote_addr by limit_conn_store")
ngx.sleep(delay)
end
end
-- 请求离开,减少数量
local function concurrent_connections_leaving()
local ctx = ngx.ctx
local lim = ctx.limit_conn
if lim then
local key = ctx.limit_conn_key
local latency = ctx.limit_conn_delay
local limit_type = ctx.limit_type
if not limit_type then
return
end
if limit_type ~= "concurrent_connections" then
return
end
--ngx.log(ngx.ERR, "释放连接", "request: ", key.." latency: "..latency)
local conn, err = lim:leaving(key, latency)
if not conn then
ngx.log(ngx.ERR, "failed to record the connection leaving ", "request: ", err.." key: "..key)
return
end
end
end
return {
concurrent_connections = concurrent_connections,
concurrent_connections_leaving = concurrent_connections_leaving
}
注意点
- 实际情况下,生产nginx会存在多台,所以这里将规则存储在redis当中
- OpenResty本身只支持单机的redis连接,所以这里需要单独引入redis集群模块。参考地址:https://developer.aliyun.com/article/1334650