Fork me on GitHub

SpringCloud Ribbon 负载均衡策略

前言

通过上篇文章对于SpringCloud Ribbon 负载均衡器的解读,我们已经对 Ribbon 实现负载均衡器以及其中包含的服务实例过滤器、服务实例信息存储对象、区域的信息快照等有了深入的认识和理解,接下来我们来看下负载均衡的几个策略实现。

正文

Ribbon中的负载均衡选择策略通过实现IRule接口来实现。具体关系如下图:

upload successful

下面我们来看下各种负载均衡策略。

AbstractLoadBalancerRule

负载均衡策略的抽象类,在该抽象类中定义了负载均衡器ILoadBalancer对象,该对象能够在具体实现选择服务策略时,获取到一些负载均衡中维护的信息来作为分配依据,并以此设计一些算法来实现针对特定场景的高效策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {

private ILoadBalancer lb;

@Override
public void setLoadBalancer(ILoadBalancer lb){
this.lb = lb;
}

@Override
public ILoadBalancer getLoadBalancer(){
return lb;
}
}

RandomRule

该策略实现了从服务实例清单中随机选择一个服务实例的功能。其具体实现如下:

主要由choose函数完成,委托给函数choose(ILoadBalancer lb, Object key)来实现。

  1. 获取可用实例列表upList和所有实例列表allList
  2. 获取一个随机数,通过chooseRandomInt(serverCount)函数;
  3. 将该随机数作为upList的索引值来返回具体实例;
  4. 选择逻辑处于一个循环中,正常情况下,每次都应该选出一个具体实例,如果出现死循环获取不到服务实例的情况,则可能出现一些问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;

while (server == null) {
if (Thread.interrupted()) {
return null;
}
List<Server> upList = lb.getReachableServers();
List<Server> allList = lb.getAllServers();

int serverCount = allList.size();
if (serverCount == 0) {
return null;
}

int index = chooseRandomInt(serverCount);
server = upList.get(index);

if (server == null) {
Thread.yield();
continue;
}

if (server.isAlive()) {
return (server);
}

server = null;
Thread.yield();
}

return server;

}

protected int chooseRandomInt(int serverCount) {
return ThreadLocalRandom.current().nextInt(serverCount);
}

@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}

RoundRobinRule

该策略实现了按照线性轮询的方式一次选择每个服务实例的功能。其具体实现逻辑如下:

  1. 获取可用实例列表reachableServers和所有实例列表allServers,并记录它们的数量upCountserverCount
  2. 获取下一个可用服务的索引,主要通过incrementAndGetModulo函数实现;
  3. 选择逻辑处于循环中,与RandomRule不同的是,如果一直选不到server超过10次,该循环终止,打印警告日志并返回null。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}

Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();

if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}

int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);

if (server == null) {
/* Transient. */
Thread.yield();
continue;
}

if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}

// Next.
server = null;
}

if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}

private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}

@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}

RetryRule

该策略实现了一个具备重试机制的实例选择功能。其具体实现逻辑如下:

  1. 内部定义一个IRule对象,默认使用RoundRobinRule实例;
  2. choose函数中实现了对内部策略进行反复尝试的策略;
  3. 若期间能够选择到具体实例就返回,若选择不到就根据设置的尝试结束时间为阈值maxRetryMillis,当超过阈值后就返回null。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public class RetryRule extends AbstractLoadBalancerRule {
IRule subRule = new RoundRobinRule();
long maxRetryMillis = 500;

public RetryRule() {
}

public RetryRule(IRule subRule) {
this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
}

public RetryRule(IRule subRule, long maxRetryMillis) {
this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
this.maxRetryMillis = (maxRetryMillis > 0) ? maxRetryMillis : 500;
}

public void setRule(IRule subRule) {
this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
}

public IRule getRule() {
return subRule;
}

public void setMaxRetryMillis(long maxRetryMillis) {
if (maxRetryMillis > 0) {
this.maxRetryMillis = maxRetryMillis;
} else {
this.maxRetryMillis = 500;
}
}

public long getMaxRetryMillis() {
return maxRetryMillis;
}



@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
subRule.setLoadBalancer(lb);
}

/*
* Loop if necessary. Note that the time CAN be exceeded depending on the
* subRule, because we're not spawning additional threads and returning
* early.
*/
public Server choose(ILoadBalancer lb, Object key) {
long requestTime = System.currentTimeMillis();
long deadline = requestTime + maxRetryMillis;

Server answer = null;

answer = subRule.choose(key);

if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {

InterruptTask task = new InterruptTask(deadline
- System.currentTimeMillis());

while (!Thread.interrupted()) {
answer = subRule.choose(key);

if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
/* pause and retry hoping it's transient */
Thread.yield();
} else {
break;
}
}

task.cancel();
}

if ((answer == null) || (!answer.isAlive())) {
return null;
} else {
return answer;
}
}

@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}

@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}

WeightedResponseTimeRule

该策略是对RoundRobinRule的扩展,增加了根据实例的运行情况来计算权重,并根据权重来挑选实例,以达到更优的分配效果。

其主要构成如下:

定时任务

WeightedResponseTimeRule策略在初始化的时候会通过serverWeightTimer.schedule启动一个定时任务,用来为每个服务实例计算权重,该任务默认30s执行一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//....
public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;

private int serverWeightTaskTimerInterval = DEFAULT_TIMER_INTERVAL;

//....

@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
if (lb instanceof BaseLoadBalancer) {
name = ((BaseLoadBalancer) lb).getName();
}
initialize(lb);
}

void initialize(ILoadBalancer lb) {
if (serverWeightTimer != null) {
serverWeightTimer.cancel();
}
serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
+ name, true);
serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
serverWeightTaskTimerInterval);
// do a initial run
ServerWeight sw = new ServerWeight();
sw.maintainWeights();

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
logger
.info("Stopping NFLoadBalancer-serverWeightTimer-"
+ name);
serverWeightTimer.cancel();
}
}));
}

//....

class DynamicServerWeightTask extends TimerTask {
public void run() {
ServerWeight serverWeight = new ServerWeight();
try {
serverWeight.maintainWeights();
} catch (Exception e) {
logger.error("Error running DynamicServerWeightTask for {}", name, e);
}
}
}
//....

权重计算

在源码中我们可以找到用于存储权重的对象 List accumulatedWeights = new ArrayList(),该List中每个权重值所处的位置对应了负载均衡器维护的实例清单中所有实例所在清单中的位置。

维护实例权重的计算过程通过maintainWeights函数实现,其代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void maintainWeights() {
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}

if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {
return;
}

try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
double totalResponseTime = 0;
// find maximal 95% response time
for (Server server : nlb.getAllServers()) {
// this will automatically load the stats if not in cache
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// weight for each server is (sum of responseTime of all servers - responseTime)
// so that the longer the response time, the less the weight and the less likely to be chosen
Double weightSoFar = 0.0;

// create new list and hot swap the reference
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}

}

该函数实现内容如下:

  • 根据LoadBalancerStats中记录的每个实例的统计信息,累加所有实例的平均响应时间,得到总平均响应时间totalResponseTime,该值会用于后续计算。
  • 为负载均衡器中维护的实例清单逐个计算权重(从第一个开始),计算规则为weightSoFar+totalResponseTime-实例平均响应时间,其中weightSoFar初始化为0,并且每计算好一个权重需要累加到weightSoFar上供下次计算使用。

如下例子:

假设4个实例A、B、C、D,它们平均响应时间为10、40、80、100,所以总响应时间为10+40+80+100=230,根据上面,可以计算出实例A、B、C、D的权重:

  • A:230 - 10 = 220
  • B:220 + (230 - 40) = 410
  • C:410 + (230 - 80) = 560
  • D:560 + (230 - 100) = 690

需要注意的是,这里的权重值只是表示了各实例权重区间上限,并非实例优先级。实例A、B、C、D的权重区间如下:

  • A:[0,220]
  • B:(220,410]
  • C:(410,560]
  • D:(560,690)

可以看到,每个区间的宽度就是:总平均响应时间-实例的平均响应时间,所有实例的平均响应时间越短、权重区间的宽度越大,宽度越大被选中的概率就越高。

我们再来看下区间边界的开闭是如何确定的。

实例选择

我们来看下该策略的实例选择算法相关代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;

while (server == null) {
// get hold of the current reference in case it is changed from the other thread
List<Double> currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
List<Server> allList = lb.getAllServers();

int serverCount = allList.size();

if (serverCount == 0) {
return null;
}

int serverIndex = 0;

// last one in the list is the sum of all weights
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// No server has been hit yet and total weight is not initialized
// fallback to use round robin
if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
int n = 0;
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}

server = allList.get(serverIndex);
}

if (server == null) {
/* Transient. */
Thread.yield();
continue;
}

if (server.isAlive()) {
return (server);
}

// Next.
server = null;
}
return server;
}

其代码逻辑主要如下:

  1. 生成一个 [0,最大权重值) 区间内的随机数。
  2. 遍历权重列表,比较权重值与随机数的大小,如果权重值大于等于随机数,就拿当前权重列表的索引值去服务实例列表中获取具体的实例。因此每个权重区间为 (x,y] 的形式,由于随机数的最小值可以为0,所以第一个实例的下限是闭区间,随机数最大值取不到权重最大值,所以最后一个实例上限是开区间。

按照上面的例子,如果随机数为230,则该值位于第二区间,所以此时就会选择实例B进行请求。

ClientConfigEnabledRoundRobinRule

该策略较为特殊,我们一般不直接使用它。因为它本身没有实现任何特殊的处理逻辑,如代码所示,在它内部定义了一个RoundRobinRule策略,而choose函数的实现也正是使用了RoundRobinRule的线下轮询机制。

虽然我们不会直接使用该策略,但是通过继承该策略,默认的choose就实现了线性轮询机制,在子类中做一些高级策略时通常可能存在一些无法实施的情况,那么就可以使用父类的实现作为备选。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {

RoundRobinRule roundRobinRule = new RoundRobinRule();

@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
roundRobinRule = new RoundRobinRule();
}

@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
roundRobinRule.setLoadBalancer(lb);
}

@Override
public Server choose(Object key) {
if (roundRobinRule != null) {
return roundRobinRule.choose(key);
} else {
throw new IllegalArgumentException(
"This class has not been initialized with the RoundRobinRule class");
}
}

}

BestAvailableRule

该策略继承自ClientConfigEnabledRoundRobinRule,同时在实现中注入了负载均衡器统计对象LoadBalancerStats,算法通过利用统计对象中保存的实例信息来选择满足要求的实例。

通过代码我们可以看到,它通过遍历负载均衡器中维护的所有实例,会过滤掉故障实例,并找出并发请求数最小的一个,所以该策略的特性是可以选出最空闲的实例。

LoadBalancerStats对象为空时,会使用父类的RoundRobinRule策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {

private LoadBalancerStats loadBalancerStats;

@Override
public Server choose(Object key) {
if (loadBalancerStats == null) {
return super.choose(key);
}
List<Server> serverList = getLoadBalancer().getAllServers();
int minimalConcurrentConnections = Integer.MAX_VALUE;
long currentTime = System.currentTimeMillis();
Server chosen = null;
for (Server server: serverList) {
ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
if (!serverStats.isCircuitBreakerTripped(currentTime)) {
int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
if (concurrentConnections < minimalConcurrentConnections) {
minimalConcurrentConnections = concurrentConnections;
chosen = server;
}
}
}
if (chosen == null) {
return super.choose(key);
} else {
return chosen;
}
}

@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
if (lb instanceof AbstractLoadBalancer) {
loadBalancerStats = ((AbstractLoadBalancer) lb).getLoadBalancerStats();
}
}
}

PredicateBasedRule

这是一个抽象策略,它也继承自ClientConfigEnabledRoundRobinRule,其基础逻辑如下:

先通过子类中实现的Predicate逻辑来过滤一部分服务实例,然后再以线性轮询的方式从过滤后的实例清单中选出一个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {

public abstract AbstractServerPredicate getPredicate();

@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
}

它是通过 Google Guava Collection 工具对集合进行过滤的接口Predicate来实现的。我们来看下AbstractServerPredicate的部分关键逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {
//......
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers) {
List<Server> eligible = getEligibleServers(servers);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

public List<Server> getEligibleServers(List<Server> servers) {
return getEligibleServers(servers, null);
}

public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
List<Server> results = Lists.newArrayList();
for (Server server: servers) {
if (this.apply(new PredicateKey(loadBalancerKey, server))) {
results.add(server);
}
}
return results;
}
}

//......
}

这个抽象策略只是提供一个实现过滤清单的模板,具体实现需要其子类去完成(实现Predicate接口的apply方法),过滤清单后得到符合条件的实例,轮询选择。

1
2
3
4
5
6
@GwtCompatible
public interface Predicate<T> {
boolean apply(@Nullable T var1);

boolean equals(@Nullable Object var1);
}

AvailabilityFilteringRule

该策略继承自PredicateBasedRule,其过滤条件使用了AvailabilityPredicate

AvailabilityPredicate的关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class AvailabilityPredicate extends  AbstractServerPredicate {
//......
@Override
public boolean apply(@Nullable PredicateKey input) {
LoadBalancerStats stats = getLBStats();
if (stats == null) {
return true;
}
return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}


private boolean shouldSkipServer(ServerStats stats) {
if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())
|| stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
return true;
}
return false;
}
//......
}

从上面代码,我们可以看到该策略的主要过滤逻辑:

  • 是否故障,即断路器是否生效已断开。
  • 实例的并发请求数大于阈值,默认Integer.MAX_VALUE,该配置可通过参数..ActiveConnectionsLimit 来修改。

这两项只要满足一个就返回false,代表节点可能故障或者负载过高,不适用处理请求,会被过滤掉,都不满足返回true,表示该节点可被选择用于处理请求。

除了上面的过滤方法,该策略的choose函数也做了一些改进优化,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class AvailabilityFilteringRule extends PredicateBasedRule { 
//......
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig))
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}

@Override
public Server choose(Object key) {
int count = 0;
Server server = roundRobinRule.choose(key);
while (count++ <= 10) {
if (predicate.apply(new PredicateKey(server))) {
return server;
}
server = roundRobinRule.choose(key);
}
return super.choose(key);
}
//......
}

可以看到,choose函数的实现逻辑并不像父类那样,先遍历所有节点进行过滤,然后在过滤后的集合中选择实例。

而是先以线性的方式选择一个实例,接着用过滤条件判断该实例是否满足要求,满足直接使用该实例,不满足选择下一个实例,并进行检查,如此循环进行,如果这个过程重复了10次还是没有找到符合要求的实例,就采用父类的实现方案。

该策略通过线性抽样的方式直接尝试寻找可用且较空闲的实例来使用,优化了父类每次都要遍历所有实例的开销。

ZoneAvoidanceRule

该策略也是PredicateBasedRule的实现类。可以看到它使用了CompositePredicate来进行服务清单过滤。这是一个组合过滤条件,在其构造函数中,它以ZoneAvoidancePredicate为主过滤条件,AvailabilityPredicate为次过滤条件来进行过滤。

1
2
3
4
5
6
7
8
9
10
11
12
public class ZoneAvoidanceRule extends PredicateBasedRule {
//.......
private CompositePredicate compositePredicate;

public ZoneAvoidanceRule() {
super();
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}
//.......
}

ZoneAvoidanceRule在实现的时候并没有像AvailabilityFilteringRule那样重写choose函数来优化,所以它和父类一样,先过滤清单,再轮询选择。

过滤条件就是上面提到的两个组合条件,我们先来看下CompositePredicate的部分源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class CompositePredicate extends AbstractServerPredicate {

private AbstractServerPredicate delegate;

private List<AbstractServerPredicate> fallbacks = Lists.newArrayList();

private int minimalFilteredServers = 1;

private float minimalFilteredPercentage = 0;

@Override
public boolean apply(@Nullable PredicateKey input) {
return delegate.apply(input);
}


public static class Builder {

private CompositePredicate toBuild;

Builder(AbstractServerPredicate primaryPredicate) {
toBuild = new CompositePredicate();
toBuild.delegate = primaryPredicate;
}

Builder(AbstractServerPredicate ...primaryPredicates) {
toBuild = new CompositePredicate();
Predicate<PredicateKey> chain = Predicates.<PredicateKey>and(primaryPredicates);
toBuild.delegate = AbstractServerPredicate.ofKeyPredicate(chain);
}

public Builder addFallbackPredicate(AbstractServerPredicate fallback) {
toBuild.fallbacks.add(fallback);
return this;
}

public Builder setFallbackThresholdAsMinimalFilteredNumberOfServers(int number) {
toBuild.minimalFilteredServers = number;
return this;
}

public Builder setFallbackThresholdAsMinimalFilteredPercentage(float percent) {
toBuild.minimalFilteredPercentage = percent;
return this;
}

public CompositePredicate build() {
return toBuild;
}
}

public static Builder withPredicates(AbstractServerPredicate ...primaryPredicates) {
return new Builder(primaryPredicates);
}

public static Builder withPredicate(AbstractServerPredicate primaryPredicate) {
return new Builder(primaryPredicate);
}

/**
* Get the filtered servers from primary predicate, and if the number of the filtered servers
* are not enough, trying the fallback predicates
*/
@Override
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
Iterator<AbstractServerPredicate> i = fallbacks.iterator();
while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
&& i.hasNext()) {
AbstractServerPredicate predicate = i.next();
result = predicate.getEligibleServers(servers, loadBalancerKey);
}
return result;
}
}

由上面源码,可以看到CompositePredicate是可以支持多个过滤条件的,它们存储在fallbacks的List里。

我们指定传入的过滤条件参数顺序就是过滤条件的优先级,因为它们放入List后是有序的。

我们主要来看下getEligibleServers的逻辑:

  • 使用主过滤条件对所有实例过滤并返回过滤后的实例清单。
  • 依次使用次过滤条件列表中的过滤条件对上面的过滤结果进行过滤。
  • 每次过滤后(包括主过滤条件和次过滤条件),都需要判断下面两个条件,只要有一个不符合就不再进行过滤,将当前结果返回供线性轮询算法选择:
    • 过滤后的实例总数 >= 最小过滤实例数(minimalFilteredServers,默认为1).
    • 过滤后的实例比例 > 最小过滤百分比(minimalFilteredPercentage,默认为0).

对于传入的两个过滤条件,AvailabilityPredicate我们上面有介绍,我们来看下ZoneAvoidancePredicate

其主要逻辑部分如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ZoneAvoidancePredicate extends  AbstractServerPredicate {
//......
@Override
public boolean apply(@Nullable PredicateKey input) {
if (!ENABLED.get()) {
return true;
}
String serverZone = input.getServer().getZone();
if (serverZone == null) {
// there is no zone information from the server, we do not want to filter
// out this server
return true;
}
LoadBalancerStats lbStats = getLBStats();
if (lbStats == null) {
// no stats available, do not filter
return true;
}
if (lbStats.getAvailableZones().size() <= 1) {
// only one zone is available, do not filter
return true;
}
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
if (!zoneSnapshot.keySet().contains(serverZone)) {
// The server zone is unknown to the load balancer, do not filter it out
return true;
}
logger.debug("Zone snapshots: {}", zoneSnapshot);
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null) {
return availableZones.contains(input.getServer().getZone());
} else {
return false;
}
}
//......
}

代码逻辑:

  • niws.loadbalancer.zoneAvoidanceRule.enabled 配置参数是否开启,如果为false,该过滤条件直接返回true。
  • 拿到实例的Zone,如果为空,该过滤条件直接返回true。
  • 拿到实例的LoadBalancerStats,如果为空或者可用Zone数量小于等于1,该过滤条件直接返回true。
  • 通过ZoneAvoidanceRule.createSnapshot函数拿到Zone映射,如果该映射里不包含该实例的Zone,该过滤条件直接返回true。
  • 否则通过ZoneAvoidanceRule.getAvailableZones拿到可用Zone列表,如果列表不为空,返回是否包含该实例的Zone结果;如果为空,直接返回false。

总结

以上就是关于Spring Cloud Ribbon 负载均衡策略的全部内容,通过了解Ribbon的负载均衡策略,可以使我们更好的了解到Ribbon的一些特性,对Ribbon有更深入的了解。




-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

SakuraTears wechat
扫一扫关注我的公众号
您的支持就是我创作的动力!
0%