Fork me on GitHub

SpringCloud Ribbon 负载均衡器

前言

上篇文章我们介绍了Spring Cloud Ribbon中实现客户端负载均衡的一些基本脉络,了解了它的一些特点。

虽然Spring Cloud 中定义了 LoadBalancerClient作为负载均衡器的通用接口,并且针对Ribbon实现了RibbonLoadBalancerClient,但是它在具体实现客户端负载均衡时,是通过Ribbon的ILoadBalancer接口实现的。

我们这篇文章来看下ILoadBalancer接口的实现类,来了解它是如何实现客户端负载均衡的。

正文

AbstractLoadBalancer

AbstractLoadBalancerILoadBalancer接口的抽象实现。该类中定义了一个关于服务实例的分组枚举类ServerGroup,包含三种不同的类型。

  • ALL:所有服务实例。
  • STATUS_UP:正常服务的实例。
  • STATUS_NOT_UP:停止服务的实例。

另外还实现一个chooseServer()函数,该函数通过调用接口的chooseServer(Object key)实现,其中参数keynull,表示在选择具体服务实例时忽略key的条件判断。

还定义了两个抽象函数:

  • getServerList(ServerGroup serverGroup):根据分组类型来获取不同的服务列表。
  • getLoadBalancerStats():定义获取LoadBalancerStats对象的方法。

PS:LoadBalancerStats对象用来存储负载均衡器中各个服务实例当前的属性和统计信息。这些信息可以帮助我们观察负载均衡器的运行情况,制定合适的负载均衡策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public abstract class AbstractLoadBalancer implements ILoadBalancer {

public enum ServerGroup{
ALL,
STATUS_UP,
STATUS_NOT_UP
}

public Server chooseServer() {
return chooseServer(null);
}

public abstract List<Server> getServerList(ServerGroup serverGroup);

public abstract LoadBalancerStats getLoadBalancerStats();
}

BaseLoadBalancer

BaseLoadBalancer类是Ribbon负载均衡器的基础实现类,该类中定义了很多有关负载均衡器的基础内容。

  • 定义维护了两个存储服务实例Server对象列表。一个用于存储所有服务实例的清单,一个用于存储正常服务的清单。

    1
    2
    3
    4
    5
    6
    @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections
    .synchronizedList(new ArrayList<Server>());
    @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> upServerList = Collections
    .synchronizedList(new ArrayList<Server>());
  • 定义了用来存储负载均衡服务器各服务实例属性和统计信息的LoadBalancerStats对象。

  • 定义了检查服务实例是否正常的IPing对象,在BaseLoadBalancer中默认为null,需要在构造时注入它的具体实现。
  • 定义了检查服务实例操作的执行策略对象IPingStrategy,在BaseLoadBalancer中默认使用了该类中定义的静态内部类SerialPingStrategy实现。

    根据源码,我们可以看到该策略采用线性遍历ping服务实例的方式实现检查。该策略在当IPing的实现速度并不理想,或者Server列表过大时,可能会影响系统性能,这时候需要通过实现IPingStrategy接口并重写pingServers(IPing ping, Server[] servers)函数去扩展ping的执行策略。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    private static class SerialPingStrategy implements IPingStrategy {

    @Override
    public boolean[] pingServers(IPing ping, Server[] servers) {
    int numCandidates = servers.length;
    boolean[] results = new boolean[numCandidates];

    logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);

    for (int i = 0; i < numCandidates; i++) {
    results[i] = false; /* Default answer is DEAD. */
    try {
    if (ping != null) {
    results[i] = ping.isAlive(servers[i]);
    }
    } catch (Exception e) {
    logger.error("Exception while pinging Server: '{}'", servers[i], e);
    }
    }
    return results;
    }
    }

    比如我们想改成并行处理,则需要写自己的并行ping策略,如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    public class ParallelPingStrategy implements IPingStrategy {
    private static Logger logger = LoggerFactory
    .getLogger(ParallelPingStrategy.class);

    @Override
    public boolean[] pingServers(IPing ping, Server[] servers) {
    int numCandidates = servers.length;
    boolean[] results = new boolean[numCandidates];

    logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);

    List<Future<Boolean>> futures = new ArrayList<>();

    ExecutorService ex = Executors.newFixedThreadPool(numCandidates+10);

    for (Server server : servers) {
    if(ping !=null){
    futures.add(ex.submit(()->ping.isAlive(server)));
    }
    }

    for (int i = 0; i < numCandidates; i++) {
    results[i] = false;
    try {
    if(!futures.isEmpty()){
    results[i] = futures.get(i).get();
    }
    }catch (Exception e){
    logger.error("Exception while pinging Server: '{}'", servers[i], e);
    }
    }

    return results;
    }
    }

    然后添加到配置即可。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Configuration
    public class CustomLoadBalancer {
    @Bean
    ILoadBalancer loadBalancer(){
    return new BaseLoadBalancer(new NIWSDiscoveryPing(),new RandomRule(),new ParallelPingStrategy());
    }
    }

    @EnableDiscoveryClient
    @SpringBootApplication
    @RibbonClient(name = "sakura",configuration = CustomLoadBalancer.class)
    public class SakuraConsumerApplication {
    //.....略
    }
  • 定义了负载均衡的处理规则IRule对象,从BaseLoadBalancer中的chooseServer(Object key)源码,我们可以知道,负载均衡器实际将服务实例选择任务委托给了IRule实例中的choose函数来实现,而这里默认初始化了RoundRobinRule作为IRule的实现对象。RoundRobinRule实现的是最基本的线性负载均衡规则。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public Server chooseServer(Object key) {
    if (counter == null) {
    counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
    return null;
    } else {
    try {
    return rule.choose(key);
    } catch (Exception e) {
    logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
    return null;
    }
    }
    }
  • 启动ping任务:在BaseLoadBalancer的默认构造函数中,会直接启动一个用于定时检查Server是否健康的任务。该任务默认的执行间隔时间为10s。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public BaseLoadBalancer() {
    this.name = DEFAULT_NAME;
    this.ping = null;
    setRule(DEFAULT_RULE);
    setupPingTask();
    lbStats = new LoadBalancerStats(DEFAULT_NAME);
    }
    void setupPingTask() {
    if (canSkipPing()) {
    return;
    }
    if (lbTimer != null) {
    lbTimer.cancel();
    }
    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
    true);
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    forceQuickPing();
    }
  • 实现了ILoadBalancer接口定义的负载均衡器应具备以下一系列基本操作。

    • addServers(List newServers):向负载均衡器中添加新的服务实例列表。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      @Override
      public void addServers(List<Server> newServers) {
      if (newServers != null && newServers.size() > 0) {
      try {
      ArrayList<Server> newList = new ArrayList<Server>();
      newList.addAll(allServerList);
      newList.addAll(newServers);
      setServersList(newList);
      } catch (Exception e) {
      logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);
      }
      }
      }
    • chooseServer(Object key):挑选一个具体服务实例。代码见上面。

    • markServerDown(Server server):标记某个服务实例暂停服务。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      public void markServerDown(Server server) {
      if (server == null || !server.isAlive()) {
      return;
      }

      logger.error("LoadBalancer [{}]: markServerDown called on [{}]", name, server.getId());
      server.setAlive(false);
      // forceQuickPing();

      notifyServerStatusChangeListener(singleton(server));
      }
    • getReachableServers():获取可用的服务实例列表。由于BaseLoadBalancer中单独维护了一个正常服务的实例清单,所以直接返回即可。

      1
      2
      3
      4
      @Override
      public List<Server> getReachableServers() {
      return Collections.unmodifiableList(upServerList);
      }
    • getAllServers():获取所有的服务实例列表。同样由于BaseLoadBalancer中单独维护了一个所有服务的实例清单,所以直接返回即可。

      1
      2
      3
      4
      @Override
      public List<Server> getAllServers() {
      return Collections.unmodifiableList(allServerList);
      }

DynamicServerListLoadBalancer

DynamicServerListLoadBalancer类继承于BaseLoadBalancer类,它是对基础负载均衡器的扩展。在该负载均衡器中,实现了服务实例清单在运行期的动态更新能力;同时它还具备了对服务实例清单的过滤功能。

我们来看下相比BaseLoadBalancer,该类新增了哪些内容。

  • ServerList

    在类成员定义中,我们可以发现新增了一个关于服务列表的操作对象ServerList serverListImpl,泛型 T 根据DynamicServerListLoadBalancer可知它是一个Server的子类。

    ServerList接口定义如下:

    1
    2
    3
    4
    5
    6
    7
    public interface ServerList<T extends Server> {

    public List<T> getInitialListOfServers();

    public List<T> getUpdatedListOfServers();

    }
    • getInitialListOfServers():获取初始化的服务实例清单。
    • getUpdatedListOfServers():获取更新的服务实例清单。

      DynamicServerListLoadBalancer里默认是使用哪个ServerList的实现类呢?

      EurekaRibbonClientConfiguration类中,我们可以找到如下创建ServerList实例的内容。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
         @Bean
      @ConditionalOnMissingBean
      public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
      if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
      return this.propertiesFactory.get(ServerList.class, config, serviceId);
      }
      DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
      config, eurekaClientProvider);
      DomainExtractingServerList serverList = new DomainExtractingServerList(
      discoveryServerList, config, this.approximateZoneFromHostname);
      return serverList;
      }

      这里我们看到它构造函数传入了DiscoveryEnabledNIWSServerList,我们来看下代码。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {

      private ServerList<DiscoveryEnabledServer> list;
      private final RibbonProperties ribbon;

      private boolean approximateZoneFromHostname;

      public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
      IClientConfig clientConfig, boolean approximateZoneFromHostname) {
      this.list = list;
      this.ribbon = RibbonProperties.from(clientConfig);
      this.approximateZoneFromHostname = approximateZoneFromHostname;
      }

      @Override
      public List<DiscoveryEnabledServer> getInitialListOfServers() {
      List<DiscoveryEnabledServer> servers = setZones(this.list
      .getInitialListOfServers());
      return servers;
      }

      @Override
      public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
      List<DiscoveryEnabledServer> servers = setZones(this.list
      .getUpdatedListOfServers());
      return servers;
      }
      //......
      }

      可以看到调用了DiscoveryEnabledNIWSServerListgetInitialListOfServers()方法和getUpdatedListOfServers()方法。这两个方法通过EurekaClient从服务中心中获取到具体的服务实例列表。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      @Override
      public List<DiscoveryEnabledServer> getInitialListOfServers(){
      return obtainServersViaDiscovery();
      }

      @Override
      public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
      return obtainServersViaDiscovery();
      }
      private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
      List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

      if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
      logger.warn("EurekaClient has not been initialized yet, returning an empty list");
      return new ArrayList<DiscoveryEnabledServer>();
      }

      EurekaClient eurekaClient = eurekaClientProvider.get();
      if (vipAddresses!=null){
      for (String vipAddress : vipAddresses.split(",")) {
      // if targetRegion is null, it will be interpreted as the same region of client
      List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
      for (InstanceInfo ii : listOfInstanceInfo) {
      if (ii.getStatus().equals(InstanceStatus.UP)) {

      if(shouldUseOverridePort){
      if(logger.isDebugEnabled()){
      logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
      }
      InstanceInfo copy = new InstanceInfo(ii);

      if(isSecure){
      ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
      }else{
      ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
      }
      }

      DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
      serverList.add(des);
      }
      }
      if (serverList.size()>0 && prioritizeVipAddressBasedServers){
      break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
      }
      }
      }
      return serverList;
      }
  • ServerListUpdater

    通过上面内容,我们已经知道Ribbon可以从Eureka Server中获取服务实例清单。那么它又是如何触发向Eureka Server去获取服务清单以及如何在获取到服务实例清单后更新本地的服务实例清单的呢?

    这就要涉及到ServerListUpdater的内容了,我们来看一下。

    1
    2
    3
    4
    5
    6
    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
    @Override
    public void doUpdate() {
    updateListOfServers();
    }
    };

    我们来看下ServerListUpdater接口内容。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public interface ServerListUpdater {
    //服务更新接口
    public interface UpdateAction {
    void doUpdate();
    }

    //启动服务更新
    void start(UpdateAction updateAction);

    //停止服务更新器
    void stop();

    //获取最近的更新时间戳
    String getLastUpdate();

    //获取上一次更新到现在的时间间隔,单位毫秒
    long getDurationSinceLastUpdateMs();

    //获取错过的更新周期数
    int getNumberMissedCycles();

    //获取核心线程数
    int getCoreThreads();
    }

    ServerListUpdater的实现类有两个,如下:

    • PollingServerListUpdater:动态服务列表更新的默认策略,使用的是定时任务的方式进行服务列表的更新。

    • EurekaNotificationServerListUpdater:利用Eureka事件监听器来驱动服务列表的更新操作。

  • ServerListFilter

    我们回到UpdateAction,看一下doUpdate的具体调用方法,可以看到它调用了updateListOfServers方法。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @VisibleForTesting
    public void updateListOfServers() {
    List<T> servers = new ArrayList<T>();
    if (serverListImpl != null) {
    servers = serverListImpl.getUpdatedListOfServers();
    LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
    getIdentifier(), servers);

    if (filter != null) {
    servers = filter.getFilteredListOfServers(servers);
    LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
    getIdentifier(), servers);
    }
    }
    updateAllServerList(servers);
    }

    可以看到用到了前面ServerList的getUpdatedListOfServers方法。用于从Eureka Server中获取服务可用实例列表。

    同时引入了一个新对象 filter,它是通过ServerListFilter进行定义的。

    ServerListFilter接口只定义了一个方法,getFilteredListOfServers(List servers),用于实现对服务实例列表按照过滤规则进行过滤。

    1
    2
    3
    4
    5
    public interface ServerListFilter<T extends Server> {

    public List<T> getFilteredListOfServers(List<T> servers);

    }

    它有四个具体实现及一个抽象实现。

    • AbstractServerListFilter

      抽象过滤器,这里定一个一个重要依据对象LoadBalancerStats,用来存储关于负载均衡器的一些属性和统计信息。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> {

      private volatile LoadBalancerStats stats;

      public void setLoadBalancerStats(LoadBalancerStats stats) {
      this.stats = stats;
      }

      public LoadBalancerStats getLoadBalancerStats() {
      return stats;
      }

      }
    • ZoneAffinityServerListFilter

      区域感知过滤器,该过滤器会基于“区域感知”方式实现服务过滤。也就是说它会根据提供服务的实例所处的区域(Zone)与消费者自身所处区域(Zone)进行比较,过滤掉不是同处于一个区域的实例。关键代码:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      @Override
      public List<T> getFilteredListOfServers(List<T> servers) {
      if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
      List<T> filteredServers = Lists.newArrayList(Iterables.filter(
      servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
      if (shouldEnableZoneAffinity(filteredServers)) {
      return filteredServers;
      } else if (zoneAffinity) {
      overrideCounter.increment();
      }
      }
      return servers;
      }
    • DefaultNIWSServerListFilter

      该过滤器完全继承自 ZoneAffinityServerListFilter,是默认的NIWS(Netflix Internal Web Service)过滤器。

      1
      2
      public class DefaultNIWSServerListFilter<T extends Server> extends ZoneAffinityServerListFilter<T> {
      }
    • ServerListSubsetFilter

      该过滤器也继承自ZoneAffinityServerListFilter,它非常适合用于拥有大规模服务器集群的系统。因为它可以产生一个“区域感知”结果的子集列表,同时它还能够通过比较服务实例的通信失败数量和并发连接数来判定该服务是否健康来选择性地从服务实例列表中剔除那些相对不够健康的实例。该过滤器实现主要有3步。

      1. 获取“区域感知”的过滤结果,作为候选的服务实例清单。
      2. 从当前消费者维护的服务实例子集中剔除那些相对不够健康的实例(同时也将这些实例从候选清单中剔除,防止第三步的时候又被选入),不够健康的标准如下。

        a. 服务实例的并发连接数超过客户端配置的值,默认0,配置参数为..ServerListSubsetFilter.eliminationConnectionThresold。

        b. 服务实例的失败数超过客户端配置的值,默认0,配置参数为..ServerListSubsetFilter.eliminationFailureThresold。

        c. 如果按符合上面任一规则的服务实例剔除后,剔除比例小于客户端默认配置的百分比,默认为0.1(10%),配置参数为..ServerListSubsetFilter.forceEliminationPercent,那么就先对剩下的实例列表进行健康排序,再从最不健康的实例进行剔除,直到达到配置剔除的百分比。

      3. 在完成剔除后,清单至少已经少了10%(默认值)的服务实例,最后通过随机的方式从候选清单中选出一批实例加入到清单中,以保持服务实例子集与原来的数量一致,默认实例子集数量为20,配置参数..ServerListSubsetFilter.size。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        @Override
        public List<T> getFilteredListOfServers(List<T> servers) {
        List<T> zoneAffinityFiltered = super.getFilteredListOfServers(servers);
        Set<T> candidates = Sets.newHashSet(zoneAffinityFiltered);
        Set<T> newSubSet = Sets.newHashSet(currentSubset);
        LoadBalancerStats lbStats = getLoadBalancerStats();
        for (T server: currentSubset) {
        // this server is either down or out of service
        if (!candidates.contains(server)) {
        newSubSet.remove(server);
        } else {
        ServerStats stats = lbStats.getSingleServerStat(server);
        // remove the servers that do not meet health criteria
        if (stats.getActiveRequestsCount() > eliminationConnectionCountThreshold.get()
        || stats.getFailureCount() > eliminationFailureCountThreshold.get()) {
        newSubSet.remove(server);
        // also remove from the general pool to avoid selecting them again
        candidates.remove(server);
        }
        }
        }
        int targetedListSize = sizeProp.get();
        int numEliminated = currentSubset.size() - newSubSet.size();
        int minElimination = (int) (targetedListSize * eliminationPercent.get());
        int numToForceEliminate = 0;
        if (targetedListSize < newSubSet.size()) {
        // size is shrinking
        numToForceEliminate = newSubSet.size() - targetedListSize;
        } else if (minElimination > numEliminated) {
        numToForceEliminate = minElimination - numEliminated;
        }

        if (numToForceEliminate > newSubSet.size()) {
        numToForceEliminate = newSubSet.size();
        }

        if (numToForceEliminate > 0) {
        List<T> sortedSubSet = Lists.newArrayList(newSubSet);
        Collections.sort(sortedSubSet, this);
        List<T> forceEliminated = sortedSubSet.subList(0, numToForceEliminate);
        newSubSet.removeAll(forceEliminated);
        candidates.removeAll(forceEliminated);
        }

        // after forced elimination or elimination of unhealthy instances,
        // the size of the set may be less than the targeted size,
        // then we just randomly add servers from the big pool
        if (newSubSet.size() < targetedListSize) {
        int numToChoose = targetedListSize - newSubSet.size();
        candidates.removeAll(newSubSet);
        if (numToChoose > candidates.size()) {
        // Not enough healthy instances to choose, fallback to use the
        // total server pool
        candidates = Sets.newHashSet(zoneAffinityFiltered);
        candidates.removeAll(newSubSet);
        }
        List<T> chosen = randomChoose(Lists.newArrayList(candidates), numToChoose);
        for (T server: chosen) {
        newSubSet.add(server);
        }
        }
        currentSubset = newSubSet;
        return Lists.newArrayList(newSubSet);
        }
    • ZonePreferenceServerListFilter

      Spring Cloud整合是新增的过滤器。若使用Spring Cloud整合Eureka 和 Ribbon时会默认使用该过滤器。它实现通过配置或者Eureka实例元数据的所属区域(Zone)来过滤出同区域的服务实例。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      @Override
      public List<Server> getFilteredListOfServers(List<Server> servers) {
      List<Server> output = super.getFilteredListOfServers(servers);
      if (this.zone != null && output.size() == servers.size()) {
      List<Server> local = new ArrayList<>();
      for (Server server : output) {
      if (this.zone.equalsIgnoreCase(server.getZone())) {
      local.add(server);
      }
      }
      if (!local.isEmpty()) {
      return local;
      }
      }
      return output;
      }

ZoneAwareLoadBalancer

ZoneAwareLoadBalancer负载均衡器是对DynamicServerListLoadBalancer的扩展。在DynamicServerListLoadBalancer中,并没有重写选择具体服务实例的chooseServer函数,所以它依然会采用BaseLoadBalancer中实现的算法。使用RoundRobinRule规则,以线性轮询的方式来选择调用的服务实例,该算法并没有区域(Zone)的概念。这样就会周期性的产生跨区域访问情况,由于跨区域有更高的延迟,这些区域实例主要以防止区域性故障来实现高可用的目的,而不能作为常规的访问性实例,所以在多区域部署的情况下会有一定的问题。

ZoneAwareLoadBalancer则可以避免这样的问题,我们来看下它是如何实现的。

我们可以看到它并没有重写setServersList,而是重写了setServerListForZones,我们先来看下DynamicServerListLoadBalancer的部分代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void setServersList(List lsrv) {
super.setServersList(lsrv);
List<T> serverList = (List<T>) lsrv;
Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
for (Server server : serverList) {
// make sure ServerStats is created to avoid creating them on hot
// path
getLoadBalancerStats().getSingleServerStat(server);
String zone = server.getZone();
if (zone != null) {
zone = zone.toLowerCase();
List<Server> servers = serversInZones.get(zone);
if (servers == null) {
servers = new ArrayList<Server>();
serversInZones.put(zone, servers);
}
servers.add(server);
}
}
setServerListForZones(serversInZones);
}

protected void setServerListForZones(
Map<String, List<Server>> zoneServersMap) {
LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
}

可以看到setServersList在最后调用了setServerListForZones方法,而ZoneAwareLoadBalancer重写了setServerListForZones方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
super.setServerListForZones(zoneServersMap);
if (balancers == null) {
balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
}
for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
String zone = entry.getKey().toLowerCase();
getLoadBalancer(zone).setServersList(entry.getValue());
}
// check if there is any zone that no longer has a server
// and set the list to empty so that the zone related metrics does not
// contain stale data
for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
existingLBEntry.getValue().setServersList(Collections.emptyList());
}
}
}

可以看到它创建了一个名为balancersConcurrentHashMap,用来存储每个Zone区域对应的负载均衡器。

再来看下它的chooseServer方法,看它如何挑选实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}

if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}

由源码可知,只有当负载均衡器中维护的实例所属的Zone区域个数大于1的时候才会执行选择策略。可以看到当区域个数大于1时,使用的规则为ZoneAvoidanceRule

总结

通过本篇文章,我们大致了解了几种Ribbon客户端负载均衡器的一些特点。后面我们将对负载均衡的一些策略做相关整理及梳理。




-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

SakuraTears wechat
扫一扫关注我的公众号
您的支持就是我创作的动力!
0%