您现在的位置是:首页 > 文章详情

【SpringCloud技术专题】「Hystrix」(3)超时机制的原理和实现

日期:2021-06-14点击:702

[每日一句]

也许你度过了很糟糕的一天,但这并不代表你会因此度过糟糕的一生。

[温馨提示]

承接上一篇文章🏹「Hystrix」(2)参数配置的详细介绍

[背景介绍]

  • 分布式系统的规模和复杂度不断增加,随着而来的是对分布式系统可用性的要求越来越高。在各种高可用设计模式中,【熔断、隔离、降级、限流】是经常被使用的。而相关的技术,Hystrix本身早已算不上什么新技术,但它却是最经典的技术体系!。

  • Hystrix以实现熔断降级的设计,从而提高了系统的可用性。

  • Hystrix是一个在调用端上,实现断路器模式,以及隔舱模式,通过避免级联故障,提高系统容错能力,从而实现高可用设计的一个Java服务组件库。

  • Hystrix实现了资源隔离机制

前提介绍

Hystrix的超时检测本质上通过启动单独线程去检测的,线程的执行的时间刚好就是任务超时的时间,本质上就是这么个简单的逻辑。

Hystrix超时后会抛出一个HystrixTimeoutException的异常。

超时检测逻辑

Hystrix的超时包括注册过程和执行过程两个,注册过程如下:

  • 执行lift(new HystrixObservableTimeoutOperator<R>(_cmd))关联超时检测任务

  • 在HystrixObservableTimeoutOperator类中,new TimerListener()负责创建检测任务,HystrixTimer.getInstance().addTimerListener(listener)负责关联定时任务

    • 在HystrixObservableTimeoutOperator类中,addTimerListener通过java的定时任务服务scheduleAtFixedRate在延迟超时时间后执行

Hystrix的超时执行过程如下:

  1. 在超时后执行listener.tick()方法后执行类TimerListener的tick方法

  2. 在TimerListener类的tick方法中执行timeoutRunnable.run()后执行HystrixContextRunnable的run方法

  3. 在HystrixContextRunnable类run方法中执行child.onError(new HystrixTimeoutException())实现超时

  4. executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));

private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> { final AbstractCommand<R> originalCommand; public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) { this.originalCommand = originalCommand; } @Override public Subscriber<? super R> call(final Subscriber<? super R> child) { final CompositeSubscription s = new CompositeSubscription(); // if the child unsubscribes we unsubscribe our parent as well child.add(s); //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread(); TimerListener listener = new TimerListener() { @Override public void tick() { if(originalCommand.isCommandTimedOut .compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { // report timeout failure originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); // shut down the original request s.unsubscribe(); final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable( originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() { @Override public void run() { child.onError(new HystrixTimeoutException()); } }); timeoutRunnable.run(); } } @Override public int getIntervalTimeInMilliseconds() { return originalCommand.properties.executionTimeoutInMilliseconds().get(); } }; final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener); // set externally so execute/queue can see this originalCommand.timeoutTimer.set(tl); /** * If this subscriber receives values it means the parent succeeded/completed */ Subscriber<R> parent = new Subscriber<R>() { @Override public void onCompleted() { if (isNotTimedOut()) { // stop timer and pass notification through tl.clear(); child.onCompleted(); } } @Override public void onError(Throwable e) { if (isNotTimedOut()) { // stop timer and pass notification through tl.clear(); child.onError(e); } } @Override public void onNext(R v) { if (isNotTimedOut()) { child.onNext(v); } } private boolean isNotTimedOut() { // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED || originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED); } }; // if s is unsubscribed we want to unsubscribe the parent s.add(parent); return parent; } } public Reference<TimerListener> addTimerListener(final TimerListener listener) { startThreadIfNeeded(); // add the listener Runnable r = new Runnable() { @Override public void run() { try { listener.tick(); } catch (Exception e) { logger.error("Failed while ticking TimerListener", e); } } }; //这里直接简单粗暴的scheduleAtFixedRate以超时时间作为周期去判断是否执行完成 ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS); return new TimerReference(listener, f); } public class HystrixContextRunnable implements Runnable { private final Callable<Void> actual; private final HystrixRequestContext parentThreadState; public HystrixContextRunnable(Runnable actual) { this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual); } public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) { this(concurrencyStrategy, HystrixRequestContext.getContextForCurrentThread(), actual); } public HystrixContextRunnable(final HystrixConcurrencyStrategy concurrencyStrategy, final HystrixRequestContext hystrixRequestContext, final Runnable actual) { this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() { @Override public Void call() throws Exception { actual.run(); return null; } }); this.parentThreadState = hystrixRequestContext; } @Override public void run() { HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread(); try { // set the state of this thread to that of its parent HystrixRequestContext.setContextOnCurrentThread(parentThreadState); // execute actual Callable with the state of the parent try { actual.call(); } catch (Exception e) { throw new RuntimeException(e); } } finally { // restore this thread back to its original state HystrixRequestContext.setContextOnCurrentThread(existingState); } } } 
原文链接:https://my.oschina.net/liboware/blog/5077648
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章