SpringCloud Hystrix源码解析
看本篇之前请看
五分钟学会 Spring Cloud Hystrix:服务容错保护(小白必看,一看就会系列教程)
使用Hystrix 后的远程调用流程如下: 1 )构建HystrixCommand 或者Hys trixObservableCommand 对象。 2 )执行命令。 3 )检查是否有相同命令执行的缓存。 4 )检查断路器是否打开。 5 )检查线程池或者信号量是否被消耗完。 6 )调用Hystri xOb servabl eCommand#construct 或Hystri xCommand#run 执行被封装的远程调用逻辑。 7 )计算链路的健康情况。 8 )在命令执行失败时获取Fallback 逻辑。 9 )返回成功的Observable 。
封装HystrixCommand
@HystrixCommand 注解 在基础应用中我们使用@HystrixCommand 注解来包装需要保护的远程调用方法。首先查看该注解的相关属性,代码如下所示:
/ / HystrixComrnand . Java
@Target ( {
Element Type . METHOD} )
@Retentio 口( RetentionPolicy . RUNTIME)
@Inherited
@Document ed
pu b l 工C @int er face Hys t r ixComrnand {
//命令分组键用于报告、预警以及面板展示
//默认为被注解方法的运行时类名
String groupKey ( ) default
//日ystr 工X 的命令键,用于区分不同的注解方法
//默认为注解方法的名称
String commandKey ( ) default
//线程池键用来指定命令执行的HystrixTh readPool
Str 工ng threadPoolKey ( ) defaul t ”” ;
第6 章断路器Hystrix •! • 125
//指定Fallback方法名, Fallback方法也可以被HystrixCommand注解
String fallbackMethod ( ) default
//自定义命令的相关配置
HystrixProperty[ ) commandProperties ( ) default {
} ;
//自定义线程池的相关配置
HystrixProperty [ J threadPoolProperties ( ) default {
} ;
//定义忽略哪些异常
Class< ? ext ends Throwable> [ ) ignoreExceptions ( ) default {
} ;
//默认的fallback
String defaultFallback ( ) default “”;
}
一般来说,对于HystrixCommand 的配置,仅需要关注fallbac灿fothod 方法,当然如果对命令和线程池有特定需要,可以进行额外的配置。除了@HystrixCommand 还有一个@Hystrix Collapser 注解用于请求合并操作,但是需要与@HystrixCommand 结合使用, 批量操作的方法必须被@HystrixCommand 注解。
HystrixCommandAspect 切面 被注解修饰的方法将会被HystrixCommand 包装执行,在Hystrix 中通过Aspectj 切面的方式来将被注解修饰的方法进行封装调用。
1 )通过MetaHolderFactory 构建出被注解修饰方法中用于构建HystrixCommand 必要信息集合类MetaHolder 。 2 )根据MetaHolder 通过HystrixCommandFactory 构建出合适的HystrixCommand 。 3 )委托CommandExecutor 执行HystrixCommand ,得到结果。 MetaHolder 持有用于构建HystrixCommand 和与被包装方法相关的必要信息,如被注解的方法、失败回滚执行的方法和默认的命令键等属性。
@Immutable
public final class MetaHolder {
private fina l Method method ; //被注解的方法
pr 工vate final Method cacheKeyMethod ;
pri vate final Method ajcMethod ;
private final Method fallbackMethod ; //失败回滚执行的方法
private final String defaultGroupKey ; // 默认的group键
private final String defaultCommandKey ; //默认的命令键
private f inal String defaultCollapserKey ; //默认的合并请求键
private f 工nal String defaultThreadPoolKey ; //默认的线程池键
private final Executiontype executiontype ; I I 执行类型
}
在HystrixCommandFactory 类中,用于创建HystrixCommand 的方法如下所示:
public Hystrxinvokable create ( MetaHolder metaHolder) {
HystriX invokable executable ;
//构建请求合并的命令
if ( metaHolder . isCollapserAnnotationPresent ( ) ) (
executable = new CommandCollapser ( metaHolder) ;
} else if ( metaHolder isObservable ( ) ) (
executable = 口ew GenericObservableCommand ( HystrixCommandBui lderFactory .
getinstance ( ) create ( metaHolder) ) ;
else (
executable = new GenericCommand ( Hystrix CommandBu lderFactory . getinstance ( ) .
create ( metaHolder) ) ;
return executable;
}
根据MetaHolder# isObservable 方法返回属性的不同, 将会构建不同的命令,比如HystrixCommand 或者HystrixObservableCommand ,前者将同步或者异步执行命令, 后者异步回调执行命令。Hystrix 根据被包装方法的返回值来决定命令的执行方式
根据被包装方法的返回值类型决定命令执行的Execution Type ,从而决定构建HystrixCommand 还是HystrixObservableCommand 。其中Future 类型的返回值将会被异步执行, rx 类型的返回值将会被异步回调执行,其他的类型将会被同步执行。 CommandExecutor 根据MetaHolder 中ExecutionType 执行类型的不同,选择同步执行、异步执行还是异步回调执行,返回不同的执行结果。同步执行,直接返回结果对象;异步执行,返回Future ,封装了异步操作的结果;异步回调执行将返回Observable ,封装响应式 执行的结果,可以通过它对执行结果进行订阅,在执行结束后进行特定的操作。
HystrixCommand 类结构
虽然类图很复杂,但是最终实现类只有三个,分别是同步或异步执行命令的GenericCommand ; 请求合并执行命令的BatchHystrixCommand , 以及异步回调执行命令的GenericObservableCommand
异步回调执行命令
在observe 方法中, 首先将创建一个方法ReplaySu均ect, rx 中的Subject 既是一个Observable 也是一个Observer 。接着调用toObservable 方法获取到懒执行的Observable,通过创建的ReplaySubject 订阅该Observable ,启动Observable 中相关命令, 同时返回ReplaySubject 给后续的观察者,用于订阅来获取执行结果( ReplaySubject 会推送所有来自原始Observable 的事件给观察者,无论它们是何时订阅的) 。
AbstractCommand#toObservable 1 ) 首先通过Observable#defer 方法来构建返回的Observable a 以Observable#defer 方式声明的Observable 只有当有观察者订阅才会真正开始创建,并且是为每一个观察者创建一个新的Observable ,这就保证了toObservable 方法返回的Observable 是纯净的,并没有 开始执行命令。 2 )在构建Observable 过程中,先通过commandState 查看当前的命令执行状态,保证命令未开始执行并且每条命令只能执行一次。 3 )如果允许请求缓存并且缓存存在的话,将尝试从缓存中获取对应的执行结果,并直接返回结果。 4 )如果无法获取缓存,通过applyHystrixSemantics 方法构建用于返回的Observable 。 5 )如果允许请求缓存,将ObsErvable 放置到缓存中用于下一次调用。 6 ) 最后为返回Observable 添加提前定义好的回调方法。 在上述的流程中,需要重点关注两个地方,一个是HystrixRequestCache ,其内封装了 缓存Observable 的逻辑;另一个是applyHystrix Semantics 回调方法,其内封装了断路、资源隔离等核心断路器逻辑。
·HystrixRequestCache 请求援存 HystrixRequestCache 对Observable 进行缓存操作,使用每个命令特有的cacheKey 对Observable 进行缓存,通过ConcurrentHashMap 保存缓存结果以保证线程安全。
HystrixRequestCache 中缓存的并不是直接的Observable , 而是被封装好的HystrixCachedObservable。在HystrixCachedObservable 中,通过ReplaySubject 订阅需要缓存的Observable ,保证了缓存的Observable 能够多次执行.
applyHystrixSemantics 断路器判断与获取信号量 在applyHystrixSemantics 回调方法中, 通过AbstractCommand#app ly HystrixSemantics方法声明Observable 。它主要工作是判断断路器是否打开, 以及尝试获取信号量用于执行命令(仅在信号量隔离模式下生效) 在AbstractColilIIland#applyHystrix.Semant ic s 中, 首先通过断路器Hystrix.CircuitBreaker 检查链路中的断路器是否开启, 如果开启的话,执行断路失败逻辑handleShortCircuitViaFallback方法。如果通过断路器的检查, 将会尝试获取信号量。如果不能获取信号量,那么执行信号量获取失败逻辑handleSemaphoreRejection ViaFallback 方法。当上述检查都通过了,才执行exec uteCornmandAndObserve 方法获取执行命令的Observable , 并为该Observable 配置 回调操作, 该回调操作在命令执行结束后以及取消订阅时用于释放信号量。
executeCommandAndObserve 配置执行异常回调方法 executeCommandAndObserve 方法主要用于为执行命令Observable 配置执行失败的回调方法,对执行失败的结果进行记录和处理。
executeCo mm an dWith Specifiedlsolation 配置线程隔离和超时控制 executeCommandW ithSpeci fiedlsolatio n 方法为命令构造了隔离的执行环境,提供两种资源隔离的方式, 线程隔离和信号量隔离;如果Hystrix 配置中开启了超时控制,还会通过Observable#lift 方法将现有的Observable 转化为添加了超时检查的Observable 。
getExecutionObservable 配置被封装的远程调用方法 getUserEx巳cutionObservable 方法将为命令获取在声明HystrixCommand 时被包装的具 体远程调用方法。在getUserExec utionObservable 方法中,通过getExecutionObservable 抽象方法将具体实现延迟到子类中
本篇讲解了
Hystrix封装HystrixCommand
HystrixCommand类结构
异步回调命令的原理解析
下篇解读
异步回调执行命令
断路器逻辑
资源隔离
请求超时监控
失败回滚逻辑的实现原理
微信搜索【码上代码 】每周文章同步更新技术分享,亿级流量分布式系统源码及教程 ,更有1000道互联网架构师面试题 ,30本技术书籍pdf 和一线大厂简历模板 等你
大家好,感谢各位人才 能看到这里的都是您已是佼佼者 我会持续为大家做技术分享 预知下篇如何 请点赞 、收藏 和评论 ,我们下期见!
本文同步分享在 博客“码上代码”(CSDN)。 如有侵权,请联系 support@oschina.cn 删除。 本文参与“OSC源创计划 ”,欢迎正在阅读的你也加入,一起分享。