Fork me on GitHub

Spring Cloud Hystrix工作原理

前言

在上一篇Spring Cloud Hystrix服务容错保护入门文章里,我们简单的了解了下 Spring Cloud Hystrix,并对 Hystrix 源码中使用到的两种模式进行了简单介绍,以方便我们更好的的了解 Hystrix 的原理。

这篇文章,我们将结合源码等来了解下 Hystrix 的工作原理。

正文

工作流程

Netflix Hystrix 官网上How-it-Works,说明了 当一个请求调用了相关依赖后Hystrix是如何工作的。

我们通过官方的流程图来说明下:

upload successful

我们来根据图示序号来看下Hystrix的工作流程。

  1. 创建HystrixCommandHystrixObservableCommand对象

首先,构建一个HystrixCommandHystrixObservableCommand对象,用来表示对依赖服务的操作请求,同时传递所有需要的参数。

上篇文章我们已经讲到它们采用了“命令模式”来实现对服务调用操作的封装。

  • HystrixCommand:用在依赖的服务返回单个操作结果的时候。
  • HystrixObservableCommand:用在依赖的服务返回多个操作结果的时候。
  1. 命令执行

图中的4种命令执行方式我们也有简单介绍。

HystrixCommand的两种执行方式:

  • execute():同步执行,从依赖的服务返回一个单一的结果对象,或是在发生错误的时候抛出异常。
  • queue():异步执行,直接返回一个Future对象,其中包含了服务执行结束时要返回的单一结果对象。

HystrixObservableCommand的两种执行方式:

  • observe():返回Observable对象,它代表了操作的多个结果,是一个 Hot Observable
  • toObservable():同样返回Observable对象,也代表了操作的多个结果,但它是一个Cold Observable

这儿的观察者-订阅者模式还有Hot ObservableCold Observable我们在上篇文章已经介绍,这儿就不过多叙述。

看表面的话,大家一定认为只有HystrixObservableCommand使用了RxJava ,实际上HystrixCommand的两种命令也使用RxJava来实现。

我们来看下两个命令的部分源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}

public Future<R> queue() {
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (delegate.isCancelled()) {
return false;
}
if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
}
final boolean res = delegate.cancel(interruptOnFutureCancel.get());
if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
final Thread t = executionThread.get();
if (t != null && !t.equals(Thread.currentThread())) {
t.interrupt();
}
}
return res;
}

@Override
public boolean isCancelled() {
return delegate.isCancelled();
}

@Override
public boolean isDone() {
return delegate.isDone();
}

@Override
public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}

@Override
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}

};
//处理立即抛出的错误
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
Throwable t = decomposeException(e);
if (t instanceof HystrixBadRequestException) {
return f;
} else if (t instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException) t;
switch (hre.getFailureType()) {
case COMMAND_EXCEPTION:
case TIMEOUT:
return f;
default:
throw hre;
}
} else {
throw Exceptions.sneakyThrow(t);
}
}
}
return f;
}
  • execute()是通过queue()返回的异步对象Future<R>get()方法来实现同步执行的。
  • queue()则是通过toObservable()来获得一个Cold Observable,然后通过toBlocking()转化为BlockingObservable,它可以把数据以阻塞的方式发射出来。toFuture()则把BlockingObservable转换为一个Future,该方法只是创建一个Future返回,并不会阻塞,这使得消费者可以自行决定如何处理异步操作。
  1. 结果是否被缓存

若当前命令的请求缓存功能是被启用的,并且该命令缓存命中,那么缓存的结果会立即以Observable对象的形式返回。

  1. 断路器是否打开

在命令结果没有缓存命中的时候,Hystrix在执行命令前需要检查断路器是否为打开状态:

  • 如果断路器是打开的,那么Hystrix不会执行命令,而是转接到fallback处理逻辑(对应第8步)。
  • 如果断路器是关闭的,那么Hystrix跳到第5步,检查是否有可用资源来执行命令。

关于断路器的内容我们后面在详述。

  1. 线程池/请求队列/信号量是否占满

如果与命令相关的线程池和请求队列,或者信号量(不使用线程池的时候)已经被占满,那么Hystrix也不会执行命令,而是转接到fallback处理逻辑(对应第8步)。

需要注意的是,这里Hystrix所判断的线程池并非容器的线程池,而是每个依赖服务的专有线程池。Hystrix为了保证不会因为某个依赖服务的问题影响到其他依赖服务而采用了“舱壁模式(Bulkhead Pattern)”来隔离每个依赖的服务。

关于该方面的内容我们后面介绍。

  1. HystrixObservableCommand.construct()HystrixCommand.run()

Hystrix会根据我们编写的方法来决定采取什么样的方式去请求依赖服务。

  • HystrixCommand.run():返回一个单一的结果,或者抛出异常。
  • HystrixObservableCommand.construct():返回一个Observable对象来发射多个结果,或者通过onError发送错误通知。

如果run()construct()方法的执行时间超过了命令设置的超时阈值,当前处理的线程将会抛出一个TimeoutException(如果该命令不在其自身的线程中执行,则会通过单独的计时线程来抛出)。在这种情况下,Hystrix会转接到fallback处理逻辑(第8步)。同时,如果当前命令没有被取消或者中断,那么它最终会忽略run()construct()方法的返回。

如果命令没有抛出异常并返回了结果,那么Hystrix在记录一些日志并采集监控报告之后将该结果返回。在使用run()的情况下,Hystrix会返回一个Observable,它发射单个结果并产生onCompleted()的结束通知;而在使用construct()的情况下,Hystrix会直接返回该方法产生的Observable对象。

  1. 计算断路器的健康度

Hystrix会将“成功”、“失败”、“拒绝”、“超时”等信息报告给断路器,而断路器会维护一组计数器来统计这些数据。

断路器会使用这些统计数据来决定是否要将断路器打开,来对某个依赖服务的请求进行“熔断/短路”,直到恢复期结束。若在恢复期结束后,根据统计数据判断如果还是未达到健康指标,就再次“熔断/短路”。

  1. fallback处理

当命令执行失败的时候,Hystrix会进入fallback尝试回退处理,我们通常也称之为“服务降级”。而能够引起服务降级处理的情况有下面几种:

  • 第4步,当前命令处于“熔断/短路”状态,断路器是打开的时候。
  • 第5步,当前命令的线程池、请求队列或者信号量被占满的时候。
  • 第6步,HystrixCommand.run()或者HystrixObservableCommand.construct()抛出异常的时候。

在服务降级逻辑中,我们需要实现一个通用的响应结果,并且该结果的处理逻辑应当是从缓存或是根据一些静态逻辑来获取,而不是依赖网络请求获取。如果一定要在降级逻辑中包含网络请求,那么该请求也必须被包装在HystrixCommand或者HystrixObservableCommand中,从而形成级联的降级策略,而最终的降级逻辑一定不是一个依赖网络请求的处理,而是一个能够稳定的返回结果的处理逻辑。

HystrixCommandHystrixObservableCommand服务降级逻辑实现不同:

  • 当使用HystrixCommand时,通过实现HystrixCommand.getFallback()来实现服务降级逻辑。
  • 当使用HystrixObservableCommand时,通过HystrixObservableCommand.resumeWithFallback()实现服务降级逻辑,该方法会返回一个Observable对象来发射一个或者多个降级结果。

当命令的降级逻辑返回结果之后,Hystrix就会将该结果返回给调用者。当使用HystrixCommand.getFallback()时,它会返回一个Observable对象,该对象会发射getFallback()的处理结果。当使用HystrixObservableCommand.resumeWithFallback()时,它会将Observable对象直接返回。

如果我们没有为命令实现降级逻辑或者降级逻辑处理中抛出了异常,Hystrix依然会返回一个Observable对象,但是它不会发射任何结果数据,而是通过onError()方法通知命令立即中断请求,并通过onError()方法将引起命令失败的异常发给调用者。实现一个有可能失败的降级逻辑是一种非常糟糕的做法,我们应该在实现降级策略时尽可能避免失败的情况。

当然,完全不可能出现失败的完美降级策略是不存在的,如果降级执行发现失败的时候,Hystrix会根据不同的执行方法做不同的处理。

  • execute():抛出异常。
  • queue():正常返回Future对象,但是当调用get()来获取结果的时候会抛出异常。
  • observe():正常返回Observable对象,当订阅它的时候,将立即通过调用订阅者的onError()方法来通知终止请求。
  • toObservable():正常返回Observable对象,当订阅它的时候,将通过调用订阅者的onError()方法来通知终止请求。
  1. 返回成功的响应

Hystrix命令执行成功之后,它会将处理结果直接返回或是以Observable的形式返回。而具体哪种方式取决于之前第2步中我们所提到的对命令的4种不同执行方式,下图中总结了这4种调用方式之前的依赖关系。

upload successful

  • toObservable():返回最原始的Observable,必须通过订阅它才会真正触发命令的执行流程。
  • observe():在toObservable()产生原始Observable之后立即订阅它,让命令能够马上开始异步执行,并返回一个Observable对象,当调用它的subscribe时,将重新产生结果和通知给订阅者。
  • queue():将toObservable()产生的原始Observable通过toBlocking()方法转换成BlockingObservable对象,并调用它的toFuture()方法返回异步的Future对象。
  • execute():在queue()产生异步结果Future对象之后,通过调用get()方法阻塞并等待结果的返回。

断路器原理

断路器在HystrixCommandHystrixObservableCommand执行过程中起到了至关重要的作用,我们来看下断路器的原理。

断路器HystrixCircuitBreaker的定义如下:

1
2
3
4
5
6
7
8
public interface HystrixCircuitBreaker {
public boolean allowRequest();
public boolean isOpen();
void markSuccess();
public static class Factory{...}
static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker{...}
static class NoOpCircuitBreaker implements HystrixCircuitBreaker{...}
}

可以看到断路器接口定义了三个抽象方法:

  • allowRequest():每个 Hystrix 命令的请求都通过它判断是否被执行。
  • isOpen():返回当前断路器是否打开。
  • markSuccess():用来闭合断路器。

另外还有三个静态类。

  • Factory:它维护了一个Hystrix命令与HystrixCircuitBreaker的关系集合。其中String类型的key通过HystrixCommandKey定义。每一个Hystrix命令都需要一个key来标识。一个Hystrix命令也会在该集合中找到它对应的HystrixCircuitBreaker实例。

    1
    private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
  • NoOpCircuitBreaker:它定义了一个什么都不做的断路器实现,它允许所有请求,并且断路器状态始终闭合。

  • HystrixCircuitBreakerImpl:它是HystrixCircuitBreaker的实现类,我们来分析一下它。

HystrixCircuitBreakerImpl

HystrixCircuitBreakerImpl定义了断路器的4个核心对象。

  • HystrixCommandProperties properties:断路器对应HystrixCommand实例的属性对象。
  • HystrixCommandMetrics metrics:用来让HystrixCommand记录各类度量指标的对象。
  • AtomicBoolean circuitOpen:断路器是否打开的标志,默认false。
  • AtomicLong circuitOpenedOrLastTestedTime:断路器打开或是上一次测试的时间戳。

HystrixCircuitBreakerImplHystrixCircuitBreaker接口的各个方法实现如下。

isOpen()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean isOpen() {
if (circuitOpen.get()) {
// if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
return true;
}

// we're closed, so let's see if errors have made us so we should trip the circuit open
HealthCounts health = metrics.getHealthCounts();

// check if we are past the statisticalWindowVolumeThreshold
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
return false;
}

if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// our failure rate is too high, trip the circuit
if (circuitOpen.compareAndSet(false, true)) {
// if the previousValue was false then we want to set the currentTime
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
// How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
// caused another thread to set it to true already even though we were in the process of doing the same
// In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
return true;
}
}
}

具体逻辑如下:

  1. 如果断路器打开标识为true,就返回true,表示断路器处于打开状态。否则,就从metrics中获取HealthCounts做判断(该对象记录了一个滚动时间窗内的请求信息快照,默认时间窗为10s)。
  2. 如果它的请求总数(QPS)在预设的阈值范围内就返回false,表示断路器处于闭合状态。该参数配置为circuitBreakerRequestVolumeThreshold,默认值为20。
  3. 如果错误百分比在阈值之内就返回false,表示断路器处于闭合状态。该配置参数为circuitBreakerErrorThresholdPercentage,默认值为50。
  4. 如果上面的两个条件都不满足,则将断路器设置为打开状态。同时如果是从关闭状态切换到打开状态的话,就将当前时间记录到circuitOpenedOrLastTestedTime对象中。

allowRequest()

1
2
3
4
5
6
7
8
9
10
11
12
13
public boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) {
// properties have asked us to force the circuit open so we will allow NO requests
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
// we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
isOpen();
// properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
return true;
}
return !isOpen() || allowSingleTest();
}

具体逻辑如下:

  1. 先检查配置对象properties中的强制打开或关闭属性是否被设置。如果强制打开,就返回false,拒绝请求。如果强制关闭,会允许所有请求,但它也会调用isOpen()来执行断路器计算逻辑,用来模拟断路器行为。
  2. 默认情况下,断路器不会进入到上述分支,而是通过!isOpen() || allowSingleTest()来判断当前断路器是否打开。

我们再来看下allowSingleTest()的相关代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean allowSingleTest() {
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 1) if the circuit is open
// 2) and it's been longer than 'sleepWindow' since we opened the circuit
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
// If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
// if this returns true that means we set the time so we'll return true to allow the singleTest
// if it returned false it means another thread raced us and allowed the singleTest before we did
return true;
}
}
return false;
}

具体逻辑如下:

  1. 拿到之前记录的时间戳circuitOpenedOrLastTestedTime
  2. 当断路器打开时,判断时间戳+配置中的circuitBreakerSleepWindowInMilliseconds是否小于当前时间,是的话,就将当前时间更新到circuitOpenedOrLastTestedTime中,并且允许此次请求。

可以看到circuitBreakerSleepWindowInMilliseconds属性设置了一个断路器打开之后的休眠时间(默认5s),在该休眠时间达到后,将再次允许请求尝试访问,此时断路器处于“半开”状态,若此时请求继续失败,断路器又进入打开状态,并且等待下一个休眠窗口过去之后再次尝试;如果请求成功,则将断路器重新置于关闭状态。所以通过isOpen()allowSingleTest()方法的配合,实现了断路器打开和关闭状态的切换。

markSuccess()

1
2
3
4
5
6
7
8
9
10
public void markSuccess() {
if (circuitOpen.get()) {
if (circuitOpen.compareAndSet(true, false)) {
//win the thread race to reset metrics
//Unsubscribe from the current stream to reset the health counts stream. This only affects the health counts view,
//and all other metric consumers are unaffected by the reset
metrics.resetStream();
}
}
}

该方法用来在断路器“半开路”状态时使用。如果Hystrix命令调用成功,通过调用它将打开的断路器关闭,并重置度量指标对象。

下图是Netflix Hystrix官方文档中关于断路器的详细执行逻辑图,可以帮助我们理解相关内容。

upload successful

依赖隔离

Hystrix使用”舱壁模式“实现线程池的隔离。它会为每一个依赖服务创立一个独立的线程池,这样就算某个依赖服务出现延迟过高的情况,也只是对该依赖服务的调用产生影响,而不会拖慢其他的依赖服务。

通过实现对依赖服务的线程池隔离,可以带来如下优势:

  • 应用自身得到完全保护,不会受不可控的依赖服务影响。即便给依赖服务分配的线程池被填满,也不会影响应用自身的其余部分。
  • 可以有效降低接入新服务的风险。如果新服务接入后运行不稳定或存在问题,完全不会影响到应用其他请求。
  • 当依赖的服务从失效恢复正常后,它的线程池会被清理并且能够马上恢复健康的服务,相比之下,容器级别的清理恢复速度要慢得多。
  • 当依赖的服务出现配置错误的时候,线程池会快速反映出此问题(通过失败次数、延迟、超时、拒绝等指标的增加情况)。同时,我们可以在不影响应用功能的情况下通过实时的动态属性刷新(Spring Cloud Config)来处理它。
  • 当依赖的服务因实现机制调整等原因造成其性能出现很大变化时,线程池的监控指标信息会反映出这样的变化。同时,我们也可以通过实时动态刷新自身应用对依赖服务的阈值进行调整以适应依赖方的改变。
  • 每个专业线程池都提供了内置的并发实现,可以利用它为同步的依赖服务构建异步访问。

虽然线程池隔离的方案带来了如此多的好处,但我们可能会担心为每一个依赖服务都分配一个线程池是否会过多地增加系统的负载和开销。对于这一点,Netflix在设计Hystrix时,认为线程池上的开销相对于隔离所带来的的好处是无法比拟的。同时,Netflix也针对线程池的开销做了相关测试,以用结果打消Hystrix实现对性能影响的顾虑。

下图是Netflix Hystrix官方提供的一个Hystrix命令的性能监控图,该命令以每秒60个请求的速度(QPS)对一个单服务实例进行访问,该服务实例每秒运行的线程数峰值为350个。

upload successful

通过上图我们可以看出,使用线程池隔离与不使用线程池隔离的耗时差异如下:

比较情况未使用线程池隔离使用了线程池隔离耗时差距
中位数2ms2ms2ms
90百分位5ms8ms3ms
99百分位28ms37ms9ms

在99%的情况想,使用线程池隔离的延迟有9ms,对于大多数需求来说这样的消耗是微乎其微的,更何况可为系统在稳定性和灵活性上带来巨大提示。

Hystrix中除了可使用线程池之外,还可以使用信号量来控制单个依赖服务的并发度,信号量的开销远比线程池的开销小,但是它不能设置超时和实现异步访问。所以,只有在依赖服务是足够可靠的情况下才使用信号量。在HystrixCommandHystrixObservableCommand中有两处支持信号量的使用。

  • 命令执行:如果将隔离策略参数execution.isolation.strategy设置为SEMAPHOREHystrix会使用信号量代替线程池来控制依赖服务的并发。
  • 降级逻辑:当Hystrix尝试降级逻辑时,它会在调用线程中使用信号量。

信号量的默认值为10,我们也可以通过动态刷新配置的方式来控制并发线程的数量。对于信号量大小的估算方法与线程池并发度估算类似。仅访问内存数据的请求一般耗时在1ms以内,性能可以达到 5000rps(rps指每秒请求数),这样级别的请求可以将信号量设置为1或者2,我们可以按照此标准并依据实际请求耗时来设置信号量。

结语

本篇我们通过了解Hystrix的部分内容,借助官方文档,可以对Hystrix的工作原理有个大概了解,下篇文章我们将详细介绍Hystrix的使用方法,如创建请求、服务降级、异常处理、请求缓存、请求合并等。

参考文档

  1. Spring Cloud 微服务实战
  2. Netflix Hystrix



-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

SakuraTears wechat
扫一扫关注我的公众号
您的支持就是我创作的动力!
0%