限流与常见实现

限流也是一个系统中老生常谈的话题了,因为资源不是无限的,因此系统总会达到一个瓶颈,我们不可能接受无限的流量直到系统崩溃,于是也就有了限流策略。

多少流量该限流

一般来说,我们有几种方法可以来对系统进行评估:

  1. 正统做法:压测。通过压测对当前系统进行评估,就可以知道单机可承载的 QPS,从而进行整体的限流评估。(注意:限流往往是分布式,而不是单机的,因此单机压测后需要 * N)
  2. 懒狗做法:当然,好多野鸡服务可能是不太会做压测的,这类服务通常都不是重保类的服务,在刚上线时也不太会有多大问题,那么我们可以先不设限流,运行一段时间,来评估正常流量,以正常流量的两到三倍作为异常。

名词解释

刚刚提到了一个名词:QPS。那么 QPS 到底是怎么样的概念,TPS 又有什么区别呢?

  • QPS(Queries Per Second):每秒查询数,意味着一台服务器每秒能够相应的查询次数,是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准。
  • TPS(Transactions Per Second):它是软件测试结果的测量单位。一个事务是指一个客户机向服务器发送请求然后服务器做出反应的过程。客户机在发送请求时开始计时,收到服务器响应后结束计时,以此来计算使用的时间和完成的事务个数。

在一次支付场景中,无论请求扣款服务多少次,TPS 只会记录一次,但是 QPS 可能会有多个。

因此通常我们都会用 QPS 来制定限流标准(说实话如果真按照 TPS 也不好计算)。

限流设计

常见的限流套路有:计数器、滑动窗口、漏桶和令牌桶。

计数器

计数器的实现非常简单,我们假设一秒钟承载 10 QPS,那么如果在一秒内统计超过 10 QPS,就意味着超过限制,则阻拦其他请求。

但是问题也很明显:

  1. 每一秒都承载了 10 QPS,但如果 20QPS 是在 0.5-1.5 这个时间流入的,那么其实超过了两倍的可承载量。
  2. 同样的,如果 1s 中承载了超过 10 QPS,而下一秒钟只有 1 QPS,那么也不代表系统一定会挂,可能在 0.5-1.5 这个时间块内,并没有超过 10 QPS,只是在 0-1 中统计超过了 10。

因此,这种一秒的固定维度统计可能会存在问题。

要实现这个也相当简单,我们使用 Redis 就可以很轻松的实现:

type Counter struct {
    limit    uint   // 最大限制 QPS
    redisKey string // cache key
    r        *redis.Client
}

func NewCounter(limit uint, redisKey string, client *redis.Client) *Counter {
    return &Counter{
        limit:    limit,
        redisKey: redisKey,
        r:        client,
    }
}

func (c *Counter) Try(ctx context.Context) bool {
    var (
        t = time.Now().Unix()
        k = c.redisKey + strconv.FormatInt(t, 10)
    )

    incr := c.r.Incr(ctx, k)
    c.r.Expire(ctx, k, time.Second)
    if res, err := incr.Result(); err != nil {
        return false
    } else {
        if res > int64(c.limit) {
            return false
        }
    }
    return true
}

Redis 官方文档中也有对这个的花式实现,就在 incr 章节:https://redis.io/docs/latest/commands/incr/

滑动窗口

刚刚我们提到,归根结底这会出现问题,都是因为 1s 太死了,不够灵活,那假设我们使用滑动窗口去动态滚动,不就完事儿了吗?使用这个方法,就能解决刚刚提到的 0.5 - 1.5s 这个统计口径带来的问题。

type Window struct {
    limit    uint   // 最大限制 QPS
    redisKey string // cache key
    r        *redis.Client
}

// KEYS[1] = redisKey
// ARGV[1] = now - 1s
// ARGV[2] = now
// ARGV[3] = random mem
// ARGV[4] = limit
const evalCommand = `
local current = redis.call("zcount", KEYS[1], ARGV[1], ARGV[2])
if current >= tonumber(ARGV[4]) then
    return -1
else
    redis.call("zadd", KEYS[1], ARGV[2], ARGV[3])
    return current
end`

func NewWindow(limit uint, redisKey string, client *redis.Client) *Window {
    return &Window{
        limit:    limit,
        redisKey: redisKey,
        r:        client,
    }
}

func (w *Window) Try(ctx context.Context) bool {
    var (
        now       = time.Now().UnixMilli()
        secBefore = now - 1000
        randStr   = strconv.FormatInt(rand.Int63n(1000000), 16)
    )
    result, err := w.r.Eval(ctx, evalCommand, []string{w.redisKey}, secBefore, now, fmt.Sprintf("%d-%s", now, randStr), w.limit).Result()
    if err != nil {
        fmt.Printf("eval error: %v", err)
        return false
    }
    if result.(int64) == -1 {
        return false
    }
    if err = w.r.ZRemRangeByScore(ctx, w.redisKey, "-inf", strconv.FormatInt(now-1000*5, 10)).Err(); err != nil {
        fmt.Printf("zrem error: %v", err)
    }
    return true
}

但是同样的,对于滑动窗口来说,我们不好实现等待,只能实现 block,因此他对于削峰填谷并没有帮助,还是需要其他实现方式。

令牌桶

上面提到了滑动窗口无法做到削峰填谷,因此我们需要一些新的实现方式,而令牌桶就是其中之一。

令牌桶的重点是按照恒定速率放入令牌,消费完了就进行 block 或者降级。

  1. 让系统以一个由限流目标决定的速率向桶中注入令牌,譬如要控制系统的访问不超过 100 次每秒,速率即设定为 100 个令牌每秒,每个令牌注入间隔为 1/100=10 毫秒。
  2. 桶中最多可以存放 N 个令牌,N 的具体数量是由超时时间和服务处理能力共同决定的。如果桶已满,第 N+1 个进入的令牌会被丢弃掉。
  3. 请求到时先从桶中取走 1 个令牌,如果桶已空就进入降级逻辑。

当然,这并不意味着我们就需要实现一个 Ticker 来进行定时任务,我们完全可以通过一个请求进来的时间和上次更新令牌桶的时间差来计算,在此期间需要补充多少令牌的库存,从而得到正确的结果。

  
type TokenBucket struct {  
    capacity uint   // 最大限制 QPS    redisKey string // cache key  
    r        *redis.Client  
    rate     int64  
}  
  
// redis 结构  
// hash 存储:  
// key: redisKey  
// field:  
//   - current: 目前令牌余量  
//   - last_update_time: 上次更新时间  
//  
// KEYS[1] = redisKey  
// ARGV[1] = now  
// ARGV[2] = capacity  
// ARGV[3] = rate  
const bucketCommand = `  
-- 获取当前桶信息  
local last_update_time = 0  
local current = 0  
local variables = redis.call("hmget", KEYS[1], "current", "last_update_time")  
if variables[1] then  
    current = tonumber(variables[1]) or 0end  
if variables[2] then  
    last_update_time = tonumber(variables[2]) or 0end  
-- 获取当前时间时间戳  
local now = tonumber(ARGV[1])  
local capacity = tonumber(ARGV[2])  
local rate = tonumber(ARGV[3])  
local num = math.floor((now - last_update_time) * rate / 1000)  
local new_current = math.min(capacity, current + num)  
-- 重新计算后没有令牌了  
if new_current == 0 then  
    return -1end  
-- 更新令牌数  
if num == 0 then  
  redis.call("hmset", KEYS[1], "current", new_current - 1)else  
  redis.call("hmset", KEYS[1], "last_update_time", now, "current", new_current - 1)end  
return new_current - 1  
`  
  
func NewTokenBucket(capacity uint, redisKey string, rate int64, r *redis.Client) *TokenBucket {  
    return &TokenBucket{  
       capacity: capacity,  
       redisKey: redisKey,  
       r:        r,  
       rate:     rate,  
    }}  
  
// Try 尝试获取令牌  
// 令牌桶思路:  
// 1. 从 redis 中获取当前令牌桶信息  
// 2. 计算当前时间与上次更新时间的时间差,根据时间差更新令牌桶  
// 3. 减扣令牌  
// 4. 返回是否获取成功  
func (tb *TokenBucket) Try(ctx context.Context) bool {  
    now := time.Now().UnixMilli()  
    res, err := tb.r.Eval(ctx, bucketCommand, []string{tb.redisKey}, now, tb.capacity, tb.rate).Result()  
    if err != nil {  
       fmt.Printf("Eval failed: %v\n", err)  
       return false  
    }  
    if res.(int64) < 0 {  
       return false  
    }  
    return true  
}

漏桶

漏桶算法和令牌桶算法类似,唯一的区别是,令牌桶是从桶中拿令牌,拿完了代表超过限制,而漏桶则是把流量注入桶中,流量满(=capacity)则代表超过了限制。

  
type LeakyBucket struct {  
    r        *redis.Client  
    redisKey string  
    capacity int64  
    rate     int64  
}  
  
// KEYS[1] = redisKey  
// ARGV[1] = now  
// ARGV[2] = capacity  
// ARGV[3] = rate  
const leakyBucketCommand = `  
local last_update_time = 0  
local current = 0  
local variables = redis.call("hmget", KEYS[1], "current", "last_update_time")  
if variables[1] then  
    current = tonumber(variables[1])end  
if variables[2] then  
    last_update_time = tonumber(variables[2])end  
local now = tonumber(ARGV[1])  
local capacity = tonumber(ARGV[2])  
local rate = tonumber(ARGV[3])  
local num = math.floor((now - last_update_time) * rate / 1000)  
local new_current = math.max(0, current - num)  
if new_current >= capacity then  
    return -1end  
if num == 0 then  
  redis.call("hmset", KEYS[1], "current", new_current + 1)else  
  redis.call("hmset", KEYS[1], "current", new_current + 1, "last_update_time", now)end  
return new_current + 1  
`  
  
func NewLeakyBucket(capacity int64, redisKey string, rate int64, r *redis.Client) *LeakyBucket {  
    return &LeakyBucket{  
       r:        r,  
       redisKey: redisKey,  
       capacity: capacity,  
       rate:     rate,  
    }}  
  
// Try 漏桶算法  
// 1. 获取当前时间  
// 2. 获取当前漏桶中的值和最后更新时间  
// 3. 根据最后更新时间的时间间隔和当前时间的差值,计算出应该释放多少容积  
// 4. 判断是否为 capacity// 5. 如果不是 capacity,则 +1 后写入新值,否则直接返回 falsefunc (lb *LeakyBucket) Try(ctx context.Context) bool {  
    now := time.Now().UnixMilli()  
    lastUpdate, err := lb.r.HGet(ctx, lb.redisKey, "last_update_time").Result()  
    current, err := lb.r.HGet(ctx, lb.redisKey, "current").Result()  
  
    lastUpdateNum, _ := strconv.ParseInt(lastUpdate, 10, 64)  
    adder := math.Floor(float64(now-lastUpdateNum) * float64(lb.rate) / 1000)  
    fmt.Printf("now: %v, lastUpdate: %v, duration: %v, shouldRemove: %v, current: %v\n", now, lastUpdate, now-lastUpdateNum, adder, current)  
    res, err := lb.r.Eval(ctx, leakyBucketCommand, []string{lb.redisKey}, now, lb.capacity, lb.rate).Result()  
    if err != nil {  
       fmt.Printf("Eval failed: %v\n", err)  
       return false  
    }  
    if res.(int64) < 0 {  
       return false  
    }  
    return true  
}

可以看出,令牌桶和漏桶更像是相同思路的不同实现,但其实我们可以通过令牌桶来处理突发的流量,因为令牌是一个不断存的过程,而使用漏桶来控制流量的平稳,因为漏桶本质就是控制流速。来同时解决突发流量和削峰这两种场景。

这一点因为 Redis 中没法存储一个所谓的漏桶队列,因此漏桶表现的更像令牌桶,如果有队列,那么看上去就清楚多了:

// LeakyBucketSimple 单进程的漏桶算法  
type LeakyBucketSimple struct {  
    capacity int64       // 桶容量  
    rate     int64       // 流速  
    mutex    sync.Mutex  // 互斥锁  
    queue    chan func() // 请求队列  
}  
  
func NewLeakyBucketSimple(capacity, rate int64) *LeakyBucketSimple {  
    lb := &LeakyBucketSimple{  
       capacity: capacity,  
       rate:     rate,  
       mutex:    sync.Mutex{},  
       queue:    make(chan func(), capacity),  
    }    go lb.leaking()  
    return lb  
}  
  
func (l *LeakyBucketSimple) Try(ctx context.Context, f func()) bool {  
    l.mutex.Lock()  
    defer l.mutex.Unlock()  
  
    if len(l.queue) >= int(l.capacity) {  
       fmt.Printf("Try to add but failed, current=%v, capacity=%v\n", len(l.queue), l.capacity)  
       return false  
    }  
    l.queue <- f  
    fmt.Printf("Try to add and success, current=%v\n", len(l.queue))  
    return true  
}  
  
func (l *LeakyBucketSimple) leaking() {  
    ticker := time.NewTicker(time.Duration(1000/l.rate) * time.Millisecond)  
    defer ticker.Stop()  
  
    for range ticker.C {  
       l.mutex.Lock()  
       select {  
       case req := <-l.queue:  
          req()  
       default:  
          fmt.Println("Queue is empty, nothing to leak.")  
       }       l.mutex.Unlock()  
    }}

分布式限流

前面因为我们是使用 Redis 实现的,因此天然的支持了分布式限流。但是在实际应用的高并发场景下就会遇到 Redis 成为了单点瓶颈的问题,此外,这意味着每次服务调用都会多增加一次网络 IO,成本反而会变高。

此时一个合适的做法是:基于令牌桶进行一次资源的再分配,具体的来说,假设我们有 5 台机器,而令牌桶里有 100 个令牌,那么我们先给每台机器分配 20 个,如果单机用完了,则再去桶里尝试拿 20 个。

此时拿 20 个以外的情况下都不需要网络 IO,就能有效的防止 Redis 之类的存储点的服务压力,也能提高响应速度。

在实际应用中,我们可以把单机的进程限流和分布式限流看做:

总结

最后我们考虑在什么情况下使用单机,什么情况下使用分布式:

  1. 单机:因为我们压测时往往会考虑单机的承载流量,因此单机的限流适合根据压测数据评估
  2. 分布式:整条链路中的资源都是有限的,不应该因为某个点压垮下游(比如 Redis 或者 MySQL),这种情况下就可以使用分布式限流去限制整个系统中的使用。

植入部分

如果您觉得文章不错,可以通过赞助支持我。

如果您不希望打赏,也可以通过关闭广告屏蔽插件的形式帮助网站运作。

标签: 知识, 代码段

添加新评论