限流算法之漏桶算法、令牌桶算法

前言

在分布式环境下,为了应对高并发,通常有以下几种手段,优先级从高到低依次为缓存、限流、降级、熔断。

缓存即是缓存热点数据,以便请求来时可以做出快速反映,减小数据库压力。

降级和熔断广义上来讲也算是限流的一种,本质上也是阻止请求进来。

今天这篇文章我们主要来讲一讲限流的两种算法以及实现方式。

正文

限流的意义

对于一个对外服务应用,我们为什么要限流?它的意义在哪里呢?

我们知道,对于对外应用,有很多情况会导致流量增大:

  • 服务用户量的不断增长
  • 针对服务的恶意攻击
  • 各种活动等

需要注意流量变“大”是相对的,相对于我们服务所能承受的流量。

比如我们服务支持10000QPS的处理能力,如果每秒处理5000个请求,显然不大,但如果服务只支持1000QPS的处理能力,那每秒5000的请求对于服务器显然“大”了。

如果长时间这样,显然会导致我们的服务熔断或挂掉,为了保证服务器稳定,我们自然要对流量进行控制,这就是限流。

PS:限流的优先级在缓存之后,因此对于这种情况,我们应首先尽可能的提高服务的QPS能力,优化逻辑,缓存热点数据等。

漏桶算法

算法内容

漏桶算法(Leaky Bucket)的原理比较简单:水(请求)先进入到漏桶里,人为设置一个最大出水速率,漏桶以<=出水速率的速度出水,当水流入速度过大会直接溢出(拒绝服务)。

如下图:

upload successful

该算法思想如下:

  1. 比如设置请求速率为1000QPS,容量池为5000,当请求小于1000QPS时,正常进行处理;
  2. 当请求为2000QPS时,每秒处理1000个请求后会剩下1000个请求;
  3. 当第5s时,容量已满,后续新请求丢弃,直到容量池内有请求被处理掉。

可以看到这种算法强制限制请求速率,缺点是十分明显的:

  • 无法面对突发的大流量 —— 比如上述例子,设置请求速率为1000QPS,应用在绝大多数情况下请求小于1000QPS,突然某天活动,用户访问激增,那么此种限流会导致大多数用户无法正常访问服务。
  • 无法有效利用资源 —— 虽然我们服务处理能力是1000QPS,但这不是绝对的,比如前6s为2000QPS,后面时间为500QPS,一小段时间服务器资源是可以承受这段请求压力的,但是漏桶算法这种情况下会丢弃一部分请求。

相关代码实现

相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class LeakyBucketLimit {
/**
* 速率 10QPS
*/
private final int RATE = 10;
/**
* 最大容量50
*/
private final int BURST = 50;

/**
* 请求最新时间
*/
private long lastTime = System.currentTimeMillis() /1000;

private int count;

private void refresh(){
long now = System.currentTimeMillis() / 1000;
count = (int)Math.max(0,count - (now - lastTime)*RATE);
lastTime = now;
}

public boolean limit(){
refresh();
if(count < BURST){
count++;
return true;
}else{
return false;
}
}

public static void main(String[] args) throws Exception{
LeakyBucketLimit limit = new LeakyBucketLimit();
for (int i = 0; i < 100; i++) {
System.out.println(i+"->"+limit.limit());
}
}
}

运行结果如下:

upload successful

可以看到当达到容量50时请求就会被拒绝。

PS:需要注意的是,实际中当请求完成后,会被释放,池子里请求是动态增加减少的过程。

通常来说,实际中使用漏桶算法来进行限流的场景并不多。

令牌桶算法

算法内容

令牌桶算法(Token Bucket)是网络流量整形(Traffic Shaping)和限流(Rate Limiting)中最常使用的一种算法,它可用于控制发送到网络上数据的数量并允许突发数据的发送。

令牌桶算法可以认为是对漏桶算法的一种改进,主要在于令牌桶算法在限制平均调用速率的同时还允许一定程度的突发调用。

令牌桶算法的实现原理如下图:

upload successful

整个过程如下:

  1. 系统以恒定的速率产生令牌,然后将令牌放入令牌桶中;
  2. 令牌桶有一个容量,当令牌桶满了的时候,再向其中放入的令牌就会被丢弃;
  3. 每次一个请求过来,需要从令牌桶中获取一个令牌,假设有令牌,那么提供服务;假设没有令牌,那么拒绝服务。

现在我们来看下为什么令牌桶可以防止一定程度上的突发流量。

假设我们想要调用速率为1000QPS,那么设置令牌的生成速度1000个/s,第一秒请求800个,那么会余下200个令牌,在第二秒请求时,就支持1200个请求了。

需要注意的是令牌的生成并不是无上限的,也必须有个容量,比如令牌的生成速度1000个/s,前5秒请求200个/s,那么第6秒支持4000个请求,但这远超系统承受能力,因此令牌桶算法需要设置桶中令牌的上限。

由于令牌桶算法允许一定程度的突发调用,因此在限流场景里比漏桶算法更加广泛。

RateLimiter

Google开源工具包Guava提供了限流工具类RateLimiter,该类就是基于令牌桶算法来完成限流,非常易于使用。

RateLimiter经常用于限制对一些物理资源或者逻辑资源的访问速率。它支持两种获取permits接口,一种是如果拿不到立刻返回false,一种会阻塞等待一段时间看能不能拿到。

RateLimiterJava 中的信号量(java.util.concurrent.Semaphore)类似,Semaphore通常用于限制并发量。

我们通过Maven引入相关Jar包。

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>

找到RateLimiter,我们在源码注释中可以看到如下一些代码及说明。

比如我们需要处理一个任务列表,但我们不希望每秒的任务提交超过两个:

1
2
3
4
5
6
7
final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"
void submitTasks(List<Runnable> tasks, Executor executor) {
for (Runnable task : tasks) {
rateLimiter.acquire(); // may wait
executor.execute(task);
}
}

想象下我们制造了一个数据流,并希望以每秒5kb的速率处理它。可以通过要求每个字节代表一个许可,然后指定每秒5000个许可来完成:

1
2
3
4
5
6
// 每秒5000个许可
final RateLimiter rateLimiter = RateLimiter.create(5000.0);
void submitPacket(byte[] packet) {
rateLimiter.acquire(packet.length);
networkService.send(packet);
}

有一点非常重要,那就是请求的许可数从来不会影响到请求本身的限制(调用acquire(1) 和调用acquire(1000) 将得到相同的限制效果,如果存在这样的调用的话),但会影响下一次请求的限制,也就是说,如果一个高开销的任务抵达一个空闲的RateLimiter,它会被马上许可,但是下一个请求会经历额外的限制,从而来偿付高开销任务。注意:RateLimiter 并不提供公平性的保证。

因此,可以认为RateLimiter支持预消费的能力。突发流量的处理,在令牌桶算法中有两种方式,一种是有足够的令牌才能消费,一种是先消费后还令牌。

RateLimiter相关源码

RateLimiter其实是一个abstract类,但是它提供了几个static方法用于创建RateLimiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//部分代码略
public abstract class RateLimiter {
/**
* 创建一个稳定输出令牌的RateLimiter,保证了平均每秒不超过permitsPerSecond个请求
* 当请求到来的速度超过了permitsPerSecond,保证每秒只处理permitsPerSecond个请求
* 当这个RateLimiter使用较少(即请求到来速度小于permitsPerSecond),会囤积最多permitsPerSecond个请求
* @param permitsPerSecond
*/
public static RateLimiter create(double permitsPerSecond) {
//...
}

/**
* 创建一个稳定输出令牌的RateLimiter,保证了平均每秒不超过permitsPerSecond个请求
* 还包含一个预热期(warmup period),预热期内,RateLimiter会平滑的将其释放令牌的速率加大,直到起达到最大速率
* 同样,如果RateLimiter在预热期没有足够的请求,则起速率会逐渐降低到“冷却”状态
* 设计这个的意图是为了满足那种资源提供方需要预热时间,而不是每次访问都能提供稳定速率的服务的情况(比如带缓存服务,需要定期刷新缓存的)
* 参数warmupPeriod和unit决定了其从“冷却”状态到达最大速率的时间
* @param permitsPerSecond
* @param warmupPeriod
* @param unit
*/
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
//...
}

/**
* 内部调用的 create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)
*/
public static RateLimiter create(double permitsPerSecond, Duration warmupPeriod) {
return create(permitsPerSecond, toNanosSaturated(warmupPeriod), TimeUnit.NANOSECONDS);
}
}

RateLimiter提供了两个获取令牌的方法,不带参数表示获取一个令牌。如果没有令牌则一直等待,返回等待的时间(单位为秒),没有被限流则直接返回0.0。

1
2
public double acquire();
public double acquire(int permits);

尝试获取令牌,分为待超时时间和不带超时时间两种。

1
2
3
4
5
6
7
8
//尝试获取一个令牌,立即返回
public boolean tryAcquire();
//尝试获取permits个令牌,立即返回
public boolean tryAcquire(int permits);
//尝试获取1个令牌,带超时时间
public boolean tryAcquire(long timeout, TimeUnit unit);
//尝试获取permits个令牌,带超时时间
public boolean tryAcquire(int permits, long timeout, TimeUnit unit);

我们从acquire来看RateLimiter如何实现限流。相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {//并发加锁
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}

final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}

abstract long reserveEarliestAvailable(int permits, long nowMicros);

RateLimiter的具体实现SmoothRateLimiter里的reserveEarliestAvailable方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
//补充令牌
resync(nowMicros);
//本次请求获取令牌时间
long returnValue = nextFreeTicketMicros;
//本次请求能从存储的令牌桶中获取的数量(取的为请求数与存储数的最小值)
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
//本次请求需要额外补充的令牌(如果存储>请求,该值为0;如果存储<请求,该值>0)
double freshPermits = requiredPermits - storedPermitsToSpend;
//使用存储令牌等待时间
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
//计算下次请求获取令牌时间
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
//令牌存量减少
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}

abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);

storedPermitsToWaitTime方法有两种实现,因为获取存储的令牌由于资源不足有可能需要等待时间。

一种是资源确实不足,这些剩余的资源我们是可以继续使用的;另一种是提供资源的服务过去还没准备好(预热期)。

为此,RateLimiter对于storedPermitsToWaitTime有两种实现策略,即SmoothBurstySmoothWarmingUp

SmoothBursty

SmoothBursty使用storedPermits不需要额外等待时间。并且默认maxBurstSeconds为1,因此maxPermitspermitsPerSecond,即最多可以存储1秒的剩余令牌,比如QPS=5,则maxPermits=5。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static final class SmoothBursty extends SmoothRateLimiter {
/** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
final double maxBurstSeconds;

SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
super(stopwatch);
this.maxBurstSeconds = maxBurstSeconds;
}

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
return 0L;
}

@Override
double coolDownIntervalMicros() {
return stableIntervalMicros;
}
}

SmoothWarmingUp

SmoothWarmingUpmaxPermits等于预热(warmup)期间能产生的令牌数,比如QPS=4,warmup为2秒,则maxPermits=8。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
static final class SmoothWarmingUp extends SmoothRateLimiter {
private final long warmupPeriodMicros;
/**
* The slope of the line from the stable interval (when permits == 0), to the cold interval
* (when permits == maxPermits)
*/
private double slope;

private double thresholdPermits;
private double coldFactor;

SmoothWarmingUp(
SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {
super(stopwatch);
this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
this.coldFactor = coldFactor;
}

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
double coldIntervalMicros = stableIntervalMicros * coldFactor;
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
maxPermits =
thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
if (availablePermitsAboveThreshold > 0.0) {
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// TODO(cpovirk): Figure out a good name for this variable.
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
permitsToTake -= permitsAboveThresholdToTake;
}
// measuring the integral on the left part of the function (the horizontal line)
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}

private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}

@Override
double coolDownIntervalMicros() {
return warmupPeriodMicros / maxPermits;
}
}

RateLimiter的限制

需要注意的是RateLimiter虽然很强大,但是它只支持单机环境。

比如我们服务集群中有5台服务器,要保证1000QPS的集群接口调用量,RateLimiter就无法实现了。

集群控流通常的做法是采用Redis来进行限制,主要有两种方式:

  • 固定窗口计数
  • 结合lua脚本,实现分布式的令牌桶算法

我们这儿就暂不做详细讨论。

总结

本文我们介绍了漏桶算法和令牌桶算法,了解了它们各自的特点。

对漏桶算法进行了简单的代码实现,对令牌桶算法我们分析了Guava提供了限流工具类RateLimiter

除了讨论了它们的特点,也说明了它们的一些不足。

不过无论哪种限流算法,都有自己的应用之处,具体场景需要具体分析。




-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

您的支持就是我创作的动力!

欢迎关注我的其它发布渠道