99网
您的当前位置:首页OpenResty单机限流

OpenResty单机限流

来源:99网


前言: 上一篇文章介绍了OpenResty的安装和lua的基本语法学习,这一篇介绍一下使用OpenResty进行限流。本文只对限流功能实现做简单介绍,不对限流算法原理做过多解释。

该项目已经实现的限流方法

  • 固定窗口(单位时间内接口可以请求的数量)
  • 令牌桶
  • 计数器(接口并发连接数)

限流流程图

限流规则存储

针对接口限流

限流算法实现主要代码部分

固定窗口

time_range_request_count(rule_detail)
    local request_uri = ngx.var.uri
    local limitTimeRange = rule_detail.limitTimeRange
    local limitNum = rule_detail.limitNum
    if not limitTimeRange or not limitNum then
        ngx.log(ngx.ERR, "limitTimeRange or limitNum is nil")
        return
    end
    -- 检查 limitTimeRange 和 limitNum 的类型
    if type(limitTimeRange) ~= "number" or type(limitNum) ~= "number" then
        ngx.log(ngx.ERR, "limitTimeRange 或 limitNum 的类型不是数字","rule_detail: "..cjson.encode(rule_detail))
        return
    end

    local lim, err = limit_count.new("api_rate_limit_req_store", limitNum, limitTimeRange)
    if not lim then
        ngx.log(ngx.ERR, "创建限流器失败 err: ", err.." rule_detail: "..cjson.encode(rule_detail).." uri: "..request_uri)
        --return ngx.exit(500)
        baseUtil.api_rate_limit_res(500,500,"服务器异常")
    end
    local delay, response = lim:incoming(request_uri, true)
    if not delay then
        if response == "rejected" then
            ngx.log(ngx.ERR, "请求数 rejected: "," rule_detail: "..cjson.encode(rule_detail).." uri: "..request_uri)
            --return ngx.exit(429)
            baseUtil.api_rate_limit_res(429,429,"服务器异常")
        end
        ngx.log(ngx.ERR, "failed to limit count: ", tostring(response).." rule_detail: "..cjson.encode(rule_detail).." uri: "..request_uri)
        --return ngx.exit(500)
        baseUtil.api_rate_limit_res(500,500,"服务器异常")
    end
end

return {
    time_range_request_count = time_range_request_count
}

令牌桶

local limit_req = require "resty.limit.req"
local cjson = require "cjson"
local baseUtil = require "BaseUtil"

-- 令牌桶算法限流
local function token_bucket_algorithm(rule_detail)
    local request_uri = ngx.var.uri
    -- 限流速率 r/s
    local rate = rule_detail.rate
    -- 令牌数量
    local burst = rule_detail.burst
    -- 检查 rate 和 burst 的类型
    if type(rate) ~= "number" or type(burst) ~= "number" then
        ngx.log(ngx.ERR, "rate 或 burst 的类型不是数字","rule_detail: "..cjson.encode(rule_detail))
        return
    end
    local lim, err = limit_req.new("api_rate_limit_req_store", rate, burst)
    if not lim then
        ngx.log(ngx.ERR, "令牌桶限流,创建限流器失败", err)
        --return ngx.exit(500)
        baseUtil.api_rate_limit_res(500,500,"服务器异常")
    end
    -- 调用限流器
    local delay, response = lim:incoming(request_uri, true)
    if not delay then
        if response == "rejected" then
            ngx.log(ngx.ERR, "请求数 rejected: "," rule_detail: "..cjson.encode(rule_detail).." uri: "..request_uri)
            --return ngx.exit(429)  -- 返回 HTTP 429 状态码
            baseUtil.api_rate_limit_res(429,429,"服务器异常")
        end
        ngx.log(ngx.ERR, "调用限流器失败: ", tostring(response).." rule_detail: "..cjson.encode(rule_detail).." uri: "..request_uri)
        --return ngx.exit(500)
        baseUtil.api_rate_limit_res(500,500,"服务器异常")
    end
    -- 如果请求没有超过,正常处理请求
    if delay >= 0 then
        ngx.sleep(delay)
    end
end

return {
    token_bucket_algorithm = token_bucket_algorithm
}

并发数

local limit_conn = require "resty.limit.conn"
local cjson = require "cjson"
local baseUtil = require "BaseUtil"

-- 最大并发连接数
local function concurrent_connections(rule_detail)
    local key = ngx.var.uri
    local concurrentNum = rule_detail.concurrentNum
    local concurrentExtraNum = rule_detail.concurrentExtraNum
    local concurrentTime = rule_detail.concurrentTime
    -- 检查 concurrentNum、concurrentExtraNum、concurrentTime 的类型
    if type(concurrentNum) ~= "number" or type(concurrentExtraNum) ~= "number" or type(concurrentTime) ~= "number"then
        ngx.log(ngx.ERR, "concurrentNum 或 concurrentExtraNum 或 concurrentTime 的类型不是数字","rule_detail: "..cjson.encode(rule_detail))
        return
    end
    local lim, err = limit_conn.new("api_rate_limit_req_store", concurrentNum, concurrentExtraNum, concurrentTime)
    if not lim then
        ngx.log(ngx.ERR, "最大并发连接数,创建限流器失败", err)
        baseUtil.api_rate_limit_res(500,500,"服务器异常")
    end
    local delay, response = lim:incoming(key, true)
    if not delay then
        if response == "rejected" then
            ngx.log(ngx.ERR, "请求数 rejected: "," rule_detail: "..cjson.encode(rule_detail).." uri: "..key)
            baseUtil.api_rate_limit_res(429,429,"服务器异常")
        end
        ngx.log(ngx.ERR, "failed to limit req: ", tostring(response).." rule_detail: "..cjson.encode(rule_detail).." uri: "..key)
        baseUtil.api_rate_limit_res(500,500,"服务器异常")
    end

    -- 如果请求没有超过并发,那么将连接信息添加到ngx.ctx中
    if lim:is_committed() then
        local ctx = ngx.ctx
        ctx.limit_type = "concurrent_connections"
        ctx.limit_conn = lim
        ctx.limit_conn_key = key
        ctx.limit_conn_delay = delay
    end

    if delay >= 0.001 then
        ngx.log(ngx.WARN, "delaying conn, excess ", delay, "s per binary_remote_addr by limit_conn_store")
        ngx.sleep(delay)
    end
end

-- 请求离开,减少数量
local function concurrent_connections_leaving()
    local ctx = ngx.ctx
    local lim = ctx.limit_conn
    if lim then
        local key = ctx.limit_conn_key
        local latency = ctx.limit_conn_delay
        local limit_type = ctx.limit_type
        if not limit_type then
            return
        end
        if limit_type ~= "concurrent_connections" then
            return
        end
        --ngx.log(ngx.ERR, "释放连接", "request: ", key.." latency: "..latency)
        local conn, err = lim:leaving(key, latency)
        if not conn then
            ngx.log(ngx.ERR, "failed to record the connection leaving ", "request: ", err.." key: "..key)
            return
        end
    end
end

return {
    concurrent_connections = concurrent_connections,
    concurrent_connections_leaving = concurrent_connections_leaving
}

注意点

  • 实际情况下,生产nginx会存在多台,所以这里将规则存储在redis当中
  • OpenResty本身只支持单机的redis连接,所以这里需要单独引入redis集群模块。参考地址:https://developer.aliyun.com/article/1334650

因篇幅问题不能全部显示,请点此查看更多更全内容