通过断路器源码理解断路器原理

一 断路器原理图

参考:

https://raw.githubusercontent.com/wiki/Netflix/Hystrix/images/circuit-breaker-1280.png

二 断路器源码解读(最新版本)

1 源码来源:https://gitee.com/mirrors/hystrix

2 源码解读

public interface HystrixCircuitBreaker {//每个Hystrix命令的请求都通过它判断是否被执行boolean allowRequest();//返回断路器是否打开boolean isOpen();//关闭断路器,作为半开状态的反馈机制void markSuccess();//打开断路器,作为半开状态的反馈机制void markNonSuccess();//* Invoked at start of command execution to attempt an execution.  This is non-idempotent - it may modify internal* state.*/boolean attemptExecution();//维护了Hystrix命令与HystrixCircuitBreaker的关系集合class Factory {// String 类型的key通过HystrixCommandKey定义,每个Hystrix命令需要有一个key来标识,同时一个Hystrix命令//也会在集合中找到它的对应HystrixCircuitBreakerprivate static ConcurrentHashMap circuitBreakersByCommand = new ConcurrentHashMap();public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {// this should find it for all but the first timeHystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());if (previouslyCached != null) {return previouslyCached;}HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));if (cbForCommand == null) {// this means the putIfAbsent step just created a new one so let's retrieve and return itreturn circuitBreakersByCommand.get(key.name());} else {// this means a race occurred and while attempting to 'put' another one got there before// and we instead retrieved it and will now return itreturn cbForCommand;}}public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {return circuitBreakersByCommand.get(key.name());}/* package */static void reset() {circuitBreakersByCommand.clear();}}//是断路器接口HystrixCircuitBreaker的实现类/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {//断路器对应HystrixCommand实例的属性对象private final HystrixCommandProperties properties;//用来让HystrixCommand记录各类度量的对象private final HystrixCommandMetrics metrics;//断路器的三种状态enum Status {CLOSED, OPEN, HALF_OPEN;}private final AtomicReference status = new AtomicReference(Status.CLOSED);//断路器是否为打开,默认为falseprivate final AtomicLong circuitOpened = new AtomicLong(-1);private final AtomicReference activeSubscription = new AtomicReference(null);protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {this.properties = properties;this.metrics = metrics;//On a timer, this will set the circuit between OPEN/CLOSED as command executions occurSubscription s = subscribeToStream();activeSubscription.set(s);}private Subscription subscribeToStream() {/** This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream*/return metrics.getHealthCountsStream().observe().subscribe(new Subscriber() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(HealthCounts hc) {// 如果总的请求(QPS)在预设的阈值范围内进入此分支。//circuitBreakerRequestVolumeThreshold默认值为20if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {// we are not past the minimum volume threshold for the stat window,// so no change to circuit status.// if it was CLOSED, it stays CLOSED// if it was half-open, we need to wait for a successful command execution// if it was open, we need to wait for sleep window to elapse} else {//如果错误百分比在阈值范围内,进入此分支if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {//we are not past the minimum error threshold for the stat window,// so no change to circuit status.// if it was CLOSED, it stays CLOSED// if it was half-open, we need to wait for a successful command execution// if it was open, we need to wait for sleep window to elapse} else {// 如果不满足上面两个条件,则将断路器设置为打开状态,同时记录时间戳if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {circuitOpened.set(System.currentTimeMillis());}}}}});}//该函数在半开时使用,若Hystrix命令调用成功,通过它将打开的断路器关闭,并重置度量指标对象@Overridepublic void markSuccess() {if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {metrics.resetStream();Subscription previousSubscription = activeSubscription.get();if (previousSubscription != null) {previousSubscription.unsubscribe();}Subscription newSubscription = subscribeToStream();activeSubscription.set(newSubscription);circuitOpened.set(-1L);}}@Overridepublic void markNonSuccess() {if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {//This thread wins the race to re-open the circuit - it resets the start time for the sleep windowcircuitOpened.set(System.currentTimeMillis());}}@Overridepublic boolean isOpen() {if (properties.circuitBreakerForceOpen().get()) {return true;}if (properties.circuitBreakerForceClosed().get()) {return false;}return circuitOpened.get() >= 0;}@Overridepublic boolean allowRequest() {//根据配置对象properties中断路器判断强制打开或关闭属性是否设置//如果强制打开,就直接返回falseif (properties.circuitBreakerForceOpen().get()) {return false;}//如果强制关闭,会允许所有请求if (properties.circuitBreakerForceClosed().get()) {return true;}//默认情况下,不会进入上面两个分支//如果断路器是关闭的,允许访问if (circuitOpened.get() == -1) {return true;} else {//如果是半开状态,不允许访问if (status.get().equals(Status.HALF_OPEN)) {return false;} else {//休眠期判断,休眠期内部允许访问,默认5秒,过了5秒,可以访问return isAfterSleepWindow();}}}//判断休眠期是否过了private boolean isAfterSleepWindow() {final long circuitOpenTime = circuitOpened.get();final long currentTime = System.currentTimeMillis();final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();return currentTime > circuitOpenTime + sleepWindowTime;}@Overridepublic boolean attemptExecution() {if (properties.circuitBreakerForceOpen().get()) {return false;}if (properties.circuitBreakerForceClosed().get()) {return true;}if (circuitOpened.get() == -1) {return true;} else {//休眠期过了,设置为半开状态if (isAfterSleepWindow()) {//only the first request after sleep window should execute//if the executing command succeeds, the status will transition to CLOSED//if the executing command fails, the status will transition to OPEN//if the executing command gets unsubscribed, the status will transition to OPENif (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {return true;} else {return false;}} else {return false;}}}}//定义了一个什么都不做的断路器,它允许所有的请求,并且断路器状态始终闭合/* package */static class NoOpCircuitBreaker implements HystrixCircuitBreaker {@Overridepublic boolean allowRequest() {return true;}@Overridepublic boolean isOpen() {return false;}@Overridepublic void markSuccess() {}@Overridepublic void markNonSuccess() {}@Overridepublic boolean attemptExecution() {return true;}}}

 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部