CodeSky 代码之空

随手记录自己的学习过程

限流与常见实现

2024-09-29 19:05分类: Go评论: 0

限流也是一个系统中老生常谈的话题了,因为资源不是无限的,因此系统总会达到一个瓶颈,我们不可能接受无限的流量直到系统崩溃,于是也就有了限流策略。

多少流量该限流

一般来说,我们有几种方法可以来对系统进行评估:

  1. 正统做法:压测。通过压测对当前系统进行评估,就可以知道单机可承载的 QPS,从而进行整体的限流评估。(注意:限流往往是分布式,而不是单机的,因此单机压测后需要 * N)
  2. 懒狗做法:当然,好多野鸡服务可能是不太会做压测的,这类服务通常都不是重保类的服务,在刚上线时也不太会有多大问题,那么我们可以先不设限流,运行一段时间,来评估正常流量,以正常流量的两到三倍作为异常。

名词解释

刚刚提到了一个名词:QPS。那么 QPS 到底是怎么样的概念,TPS 又有什么区别呢?

  • QPS(Queries Per Second):每秒查询数,意味着一台服务器每秒能够相应的查询次数,是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准。
  • TPS(Transactions Per Second):它是软件测试结果的测量单位。一个事务是指一个客户机向服务器发送请求然后服务器做出反应的过程。客户机在发送请求时开始计时,收到服务器响应后结束计时,以此来计算使用的时间和完成的事务个数。

在一次支付场景中,无论请求扣款服务多少次,TPS 只会记录一次,但是 QPS 可能会有多个。

因此通常我们都会用 QPS 来制定限流标准(说实话如果真按照 TPS 也不好计算)。

限流设计

常见的限流套路有:计数器、滑动窗口、漏桶和令牌桶。

计数器

计数器的实现非常简单,我们假设一秒钟承载 10 QPS,那么如果在一秒内统计超过 10 QPS,就意味着超过限制,则阻拦其他请求。

但是问题也很明显:

  1. 每一秒都承载了 10 QPS,但如果 20QPS 是在 0.5-1.5 这个时间流入的,那么其实超过了两倍的可承载量。
  2. 同样的,如果 1s 中承载了超过 10 QPS,而下一秒钟只有 1 QPS,那么也不代表系统一定会挂,可能在 0.5-1.5 这个时间块内,并没有超过 10 QPS,只是在 0-1 中统计超过了 10。

因此,这种一秒的固定维度统计可能会存在问题。

要实现这个也相当简单,我们使用 Redis 就可以很轻松的实现:

1type Counter struct {
2	limit    uint   // 最大限制 QPS
3	redisKey string // cache key
4	r        *redis.Client
5}
6
7func NewCounter(limit uint, redisKey string, client *redis.Client) *Counter {
8	return &Counter{
9		limit:    limit,
10		redisKey: redisKey,
11		r:        client,
12	}
13}
14
15func (c *Counter) Try(ctx context.Context) bool {
16	var (
17		t = time.Now().Unix()
18		k = c.redisKey + strconv.FormatInt(t, 10)
19	)
20
21	incr := c.r.Incr(ctx, k)
22	c.r.Expire(ctx, k, time.Second)
23	if res, err := incr.Result(); err != nil {
24		return false
25	} else {
26		if res > int64(c.limit) {
27			return false
28		}
29	}
30	return true
31}
32

Redis 官方文档中也有对这个的花式实现,就在 incr 章节:https://redis.io/docs/latest/commands/incr/

滑动窗口

刚刚我们提到,归根结底这会出现问题,都是因为 1s 太死了,不够灵活,那假设我们使用滑动窗口去动态滚动,不就完事儿了吗?使用这个方法,就能解决刚刚提到的 0.5 - 1.5s 这个统计口径带来的问题。

1type Window struct {
2	limit    uint   // 最大限制 QPS
3	redisKey string // cache key
4	r        *redis.Client
5}
6
7// KEYS[1] = redisKey
8// ARGV[1] = now - 1s
9// ARGV[2] = now
10// ARGV[3] = random mem
11// ARGV[4] = limit
12const evalCommand = `
13local current = redis.call("zcount", KEYS[1], ARGV[1], ARGV[2])
14if current >= tonumber(ARGV[4]) then
15    return -1
16else
17	redis.call("zadd", KEYS[1], ARGV[2], ARGV[3])
18	return current
19end`
20
21func NewWindow(limit uint, redisKey string, client *redis.Client) *Window {
22	return &Window{
23		limit:    limit,
24		redisKey: redisKey,
25		r:        client,
26	}
27}
28
29func (w *Window) Try(ctx context.Context) bool {
30	var (
31		now       = time.Now().UnixMilli()
32		secBefore = now - 1000
33		randStr   = strconv.FormatInt(rand.Int63n(1000000), 16)
34	)
35	result, err := w.r.Eval(ctx, evalCommand, []string{w.redisKey}, secBefore, now, fmt.Sprintf("%d-%s", now, randStr), w.limit).Result()
36	if err != nil {
37		fmt.Printf("eval error: %v", err)
38		return false
39	}
40	if result.(int64) == -1 {
41		return false
42	}
43	if err = w.r.ZRemRangeByScore(ctx, w.redisKey, "-inf", strconv.FormatInt(now-1000*5, 10)).Err(); err != nil {
44		fmt.Printf("zrem error: %v", err)
45	}
46	return true
47}
48

但是同样的,对于滑动窗口来说,我们不好实现等待,只能实现 block,因此他对于削峰填谷并没有帮助,还是需要其他实现方式。

令牌桶

上面提到了滑动窗口无法做到削峰填谷,因此我们需要一些新的实现方式,而令牌桶就是其中之一。

令牌桶的重点是按照恒定速率放入令牌,消费完了就进行 block 或者降级。

  1. 让系统以一个由限流目标决定的速率向桶中注入令牌,譬如要控制系统的访问不超过 100 次每秒,速率即设定为 100 个令牌每秒,每个令牌注入间隔为 1/100=10 毫秒。
  2. 桶中最多可以存放 N 个令牌,N 的具体数量是由超时时间和服务处理能力共同决定的。如果桶已满,第 N+1 个进入的令牌会被丢弃掉。
  3. 请求到时先从桶中取走 1 个令牌,如果桶已空就进入降级逻辑。

当然,这并不意味着我们就需要实现一个 Ticker 来进行定时任务,我们完全可以通过一个请求进来的时间和上次更新令牌桶的时间差来计算,在此期间需要补充多少令牌的库存,从而得到正确的结果。

1  
2type TokenBucket struct {  
3    capacity uint   // 最大限制 QPS    redisKey string // cache key  
4    r        *redis.Client  
5    rate     int64  
6}  
7  
8// redis 结构  
9// hash 存储:  
10// key: redisKey  
11// field:  
12//   - current: 目前令牌余量  
13//   - last_update_time: 上次更新时间  
14//  
15// KEYS[1] = redisKey  
16// ARGV[1] = now  
17// ARGV[2] = capacity  
18// ARGV[3] = rate  
19const bucketCommand = `  
20-- 获取当前桶信息  
21local last_update_time = 0  
22local current = 0  
23local variables = redis.call("hmget", KEYS[1], "current", "last_update_time")  
24if variables[1] then  
25    current = tonumber(variables[1]) or 0end  
26if variables[2] then  
27    last_update_time = tonumber(variables[2]) or 0end  
28-- 获取当前时间时间戳  
29local now = tonumber(ARGV[1])  
30local capacity = tonumber(ARGV[2])  
31local rate = tonumber(ARGV[3])  
32local num = math.floor((now - last_update_time) * rate / 1000)  
33local new_current = math.min(capacity, current + num)  
34-- 重新计算后没有令牌了  
35if new_current == 0 then  
36    return -1end  
37-- 更新令牌数  
38if num == 0 then  
39  redis.call("hmset", KEYS[1], "current", new_current - 1)else  
40  redis.call("hmset", KEYS[1], "last_update_time", now, "current", new_current - 1)end  
41return new_current - 1  
42`  
43  
44func NewTokenBucket(capacity uint, redisKey string, rate int64, r *redis.Client) *TokenBucket {  
45    return &TokenBucket{  
46       capacity: capacity,  
47       redisKey: redisKey,  
48       r:        r,  
49       rate:     rate,  
50    }}  
51  
52// Try 尝试获取令牌  
53// 令牌桶思路:  
54// 1. 从 redis 中获取当前令牌桶信息  
55// 2. 计算当前时间与上次更新时间的时间差,根据时间差更新令牌桶  
56// 3. 减扣令牌  
57// 4. 返回是否获取成功  
58func (tb *TokenBucket) Try(ctx context.Context) bool {  
59    now := time.Now().UnixMilli()  
60    res, err := tb.r.Eval(ctx, bucketCommand, []string{tb.redisKey}, now, tb.capacity, tb.rate).Result()  
61    if err != nil {  
62       fmt.Printf("Eval failed: %v\n", err)  
63       return false  
64    }  
65    if res.(int64) < 0 {  
66       return false  
67    }  
68    return true  
69}
70

漏桶

漏桶算法和令牌桶算法类似,唯一的区别是,令牌桶是从桶中拿令牌,拿完了代表超过限制,而漏桶则是把流量注入桶中,流量满(=capacity)则代表超过了限制。

1  
2type LeakyBucket struct {  
3    r        *redis.Client  
4    redisKey string  
5    capacity int64  
6    rate     int64  
7}  
8  
9// KEYS[1] = redisKey  
10// ARGV[1] = now  
11// ARGV[2] = capacity  
12// ARGV[3] = rate  
13const leakyBucketCommand = `  
14local last_update_time = 0  
15local current = 0  
16local variables = redis.call("hmget", KEYS[1], "current", "last_update_time")  
17if variables[1] then  
18    current = tonumber(variables[1])end  
19if variables[2] then  
20    last_update_time = tonumber(variables[2])end  
21local now = tonumber(ARGV[1])  
22local capacity = tonumber(ARGV[2])  
23local rate = tonumber(ARGV[3])  
24local num = math.floor((now - last_update_time) * rate / 1000)  
25local new_current = math.max(0, current - num)  
26if new_current >= capacity then  
27    return -1end  
28if num == 0 then  
29  redis.call("hmset", KEYS[1], "current", new_current + 1)else  
30  redis.call("hmset", KEYS[1], "current", new_current + 1, "last_update_time", now)end  
31return new_current + 1  
32`  
33  
34func NewLeakyBucket(capacity int64, redisKey string, rate int64, r *redis.Client) *LeakyBucket {  
35    return &LeakyBucket{  
36       r:        r,  
37       redisKey: redisKey,  
38       capacity: capacity,  
39       rate:     rate,  
40    }}  
41  
42// Try 漏桶算法  
43// 1. 获取当前时间  
44// 2. 获取当前漏桶中的值和最后更新时间  
45// 3. 根据最后更新时间的时间间隔和当前时间的差值,计算出应该释放多少容积  
46// 4. 判断是否为 capacity// 5. 如果不是 capacity,则 +1 后写入新值,否则直接返回 falsefunc (lb *LeakyBucket) Try(ctx context.Context) bool {  
47    now := time.Now().UnixMilli()  
48    lastUpdate, err := lb.r.HGet(ctx, lb.redisKey, "last_update_time").Result()  
49    current, err := lb.r.HGet(ctx, lb.redisKey, "current").Result()  
50  
51    lastUpdateNum, _ := strconv.ParseInt(lastUpdate, 10, 64)  
52    adder := math.Floor(float64(now-lastUpdateNum) * float64(lb.rate) / 1000)  
53    fmt.Printf("now: %v, lastUpdate: %v, duration: %v, shouldRemove: %v, current: %v\n", now, lastUpdate, now-lastUpdateNum, adder, current)  
54    res, err := lb.r.Eval(ctx, leakyBucketCommand, []string{lb.redisKey}, now, lb.capacity, lb.rate).Result()  
55    if err != nil {  
56       fmt.Printf("Eval failed: %v\n", err)  
57       return false  
58    }  
59    if res.(int64) < 0 {  
60       return false  
61    }  
62    return true  
63}
64

可以看出,令牌桶和漏桶更像是相同思路的不同实现,但其实我们可以通过令牌桶来处理突发的流量,因为令牌是一个不断存的过程,而使用漏桶来控制流量的平稳,因为漏桶本质就是控制流速。来同时解决突发流量和削峰这两种场景。

这一点因为 Redis 中没法存储一个所谓的漏桶队列,因此漏桶表现的更像令牌桶,如果有队列,那么看上去就清楚多了:

1// LeakyBucketSimple 单进程的漏桶算法  
2type LeakyBucketSimple struct {  
3    capacity int64       // 桶容量  
4    rate     int64       // 流速  
5    mutex    sync.Mutex  // 互斥锁  
6    queue    chan func() // 请求队列  
7}  
8  
9func NewLeakyBucketSimple(capacity, rate int64) *LeakyBucketSimple {  
10    lb := &LeakyBucketSimple{  
11       capacity: capacity,  
12       rate:     rate,  
13       mutex:    sync.Mutex{},  
14       queue:    make(chan func(), capacity),  
15    }    go lb.leaking()  
16    return lb  
17}  
18  
19func (l *LeakyBucketSimple) Try(ctx context.Context, f func()) bool {  
20    l.mutex.Lock()  
21    defer l.mutex.Unlock()  
22  
23    if len(l.queue) >= int(l.capacity) {  
24       fmt.Printf("Try to add but failed, current=%v, capacity=%v\n", len(l.queue), l.capacity)  
25       return false  
26    }  
27    l.queue <- f  
28    fmt.Printf("Try to add and success, current=%v\n", len(l.queue))  
29    return true  
30}  
31  
32func (l *LeakyBucketSimple) leaking() {  
33    ticker := time.NewTicker(time.Duration(1000/l.rate) * time.Millisecond)  
34    defer ticker.Stop()  
35  
36    for range ticker.C {  
37       l.mutex.Lock()  
38       select {  
39       case req := <-l.queue:  
40          req()  
41       default:  
42          fmt.Println("Queue is empty, nothing to leak.")  
43       }       l.mutex.Unlock()  
44    }}
45

分布式限流

前面因为我们是使用 Redis 实现的,因此天然的支持了分布式限流。但是在实际应用的高并发场景下就会遇到 Redis 成为了单点瓶颈的问题,此外,这意味着每次服务调用都会多增加一次网络 IO,成本反而会变高。

此时一个合适的做法是:基于令牌桶进行一次资源的再分配,具体的来说,假设我们有 5 台机器,而令牌桶里有 100 个令牌,那么我们先给每台机器分配 20 个,如果单机用完了,则再去桶里尝试拿 20 个。

此时拿 20 个以外的情况下都不需要网络 IO,就能有效的防止 Redis 之类的存储点的服务压力,也能提高响应速度。

在实际应用中,我们可以把单机的进程限流和分布式限流看做:

总结

最后我们考虑在什么情况下使用单机,什么情况下使用分布式:

  1. 单机:因为我们压测时往往会考虑单机的承载流量,因此单机的限流适合根据压测数据评估
  2. 分布式:整条链路中的资源都是有限的,不应该因为某个点压垮下游(比如 Redis 或者 MySQL),这种情况下就可以使用分布式限流去限制整个系统中的使用。

评论 (0)