Spring Cloud Hystrix源码分析
Spring Cloud Hystrix源码分析
Spring Cloud Hystrix源码解读@EnableCircuitBreaker
职责:
- 激活Circuit Breaker
初始化顺序 - @EnableCircuitBreaker
- EnableCircuitBreakerImportSelector
- HystrixCircuitBreakerConfiguration
HystrixCircuitBreakerConfiguration
初始化组件
- HystrixCommandAspect
- HystrixShutdownHook
- HystrixStreamEndpoint:Servlet
- HystrixMetricsPollerConfiguration
Netflix Hystrix源码解读
HystrixCommandAspect
依赖组件
- MetaholderFactory
- HystrixCommandFactory:生成HystriInvokable
-
HystrixInvokable
CommandCollapser GenericObservableCommand GenericCommand
Future实现服务熔断
package com.segumentfault.springcloudlesson9.future; import java.util.Random; import java.util.concurrent.*; /** * 通过 {@link Future} 实现 服务熔断 * */ public class FutureCircuitBreakerDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { // 初始化线程池 ExecutorService executorService = Executors.newSingleThreadExecutor(); RandomCommand command = new RandomCommand(); Future<String> future = executorService.submit(command::run); String result = null; // 100 毫秒超时时间 try { result = future.get(100, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { // fallback 方法调用 result = command.fallback(); } System.out.println(result); executorService.shutdown(); } /** * 随机对象 */ private static final Random random = new Random(); /** * 随机事件执行命令 */ public static class RandomCommand implements Command<String> { @Override public String run() throws InterruptedException { long executeTime = random.nextInt(200); // 通过休眠来模拟执行时间 System.out.println("Execute Time : " + executeTime + " ms"); Thread.sleep(executeTime); return "Hello,World"; } @Override public String fallback() { return "Fallback"; } } public interface Command<T> { /** * 正常执行,并且返回结果 * * @return */ T run() throws Exception; /** * 错误时,返回容错结果 * * @return */ T fallback(); } }
RxJava基础
单数据:Single
Single.just("Hello,World") // 仅能发布单个数据 .subscribeOn(Schedulers.io()) // 在 I/O 线程执行 .subscribe(RxJavaDemo::println) // 订阅并且消费数据 ;
多数据:Observable
List<Integer> values = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); Observable.from(values) //发布多个数据 .subscribeOn(Schedulers.computation()) // 在 I/O 线程执行 .subscribe(RxJavaDemo::println) // 订阅并且消费数据 ; // 等待线程执行完毕 Thread.sleep(100);
使用标准Reactive模式
List<Integer> values = Arrays.asList(1, 2, 3); Observable.from(values) //发布多个数据 .subscribeOn(Schedulers.newThread()) // 在 newThread 线程执行 .subscribe(value -> { if (value > 2) throw new IllegalStateException("数据不应许大于 2"); //消费数据 println("消费数据:" + value); }, e -> { // 当异常情况,中断执行 println("发生异常 , " + e.getMessage()); }, () -> { // 当整体流程完成时 println("流程执行完成"); }) ; // 等待线程执行完毕 Thread.sleep(100);
Java 9 Flow API
只做了解
package concurrent.java9; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; /** * {@link SubmissionPublisher} * * @author mercyblitz **/ public class SubmissionPublisherDemo { public static void main(String[] args) throws InterruptedException { try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()) { //Publisher(100) => A -> B -> C => Done publisher.subscribe(new IntegerSubscriber("A")); publisher.subscribe(new IntegerSubscriber("B")); publisher.subscribe(new IntegerSubscriber("C")); // 提交数据到各个订阅器 publisher.submit(100); } Thread.currentThread().join(1000L); } private static class IntegerSubscriber implements Flow.Subscriber<Integer> { private final String name; private Flow.Subscription subscription; private IntegerSubscriber(String name) { this.name = name; } @Override public void onSubscribe(Flow.Subscription subscription) { System.out.printf( "Thread[%s] Current Subscriber[%s] " + "subscribes subscription[%s]\n", Thread.currentThread().getName(), name, subscription); this.subscription = subscription; subscription.request(1); } @Override public void onNext(Integer item) { System.out.printf( "Thread[%s] Current Subscriber[%s] " + "receives item[%d]\n", Thread.currentThread().getName(), name, item); subscription.request(1); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onComplete() { System.out.printf( "Thread[%s] Current Subscriber[%s] " + "is completed!\n", Thread.currentThread().getName(), name); } } }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Windows Server 2016 配置指南 全系列
配置 和 Linux 平台不一样,往往我们使用 Windows Server 都会选择有 GUI 的图形化版本(高手用 Core 也不用看我半吊子的教程了),而且 Windows 又自身占了一部分内存(防火墙、杀软等等),所以我们选择 Windows 平台的话,配置一定不能低。 推荐最低配置: 1核心 2G,内存实在拮据就只能用 32bit 版本了。 软件 系统是 64bit,那么其他软件虽然也能安装 32bit 的版本,但是 64bit 无论稳定性还是性能都是最好的。 Windows 平台在宣传能稳定用于生产的 Stable 版本下,能使用最新的就用最新的,例如我是 Wordpress 博客,那么能用 PHP7 就用,MariaDB 能上就上最新的 10.1。 像 VC14 运行库相对旧的运行库,稳定性和性能都会提升。 PHP7 提供了完整的 64bit 支持。MaridaDB 10.1 在 SSD 的表现下会更出色等等。 排序 一、系统设置[[Windows Server 2016 配置指南 之 设置虚拟内存] ](https://yq.aliyun.com/articles/60...
- 下一篇
PHP技术月刊第1期:使用 Phan 为你的 PHP 项目保驾护航
云栖社区“世界上最好的编程语言”——PHP开始发布技术月刊啦。PHP技术月刊将会为大家介绍最新的PHP技术与动态、预告活动、最热问答以及技术直播等,欢迎大家订阅。 最新动态 使用 Phan 为你的 PHP 项目保驾护航 - 代码静态扫描 很多时候,最大的优势在某些情况下就会变成最大的劣势。PHP 语法非常灵活,也不用编译。但是在项目比较复杂的时候,可能会导致一些意想不到的 bug,本文就为大家介绍了如何使用代码静态扫描技术为PHP项目保驾护航。 一次 group by + order by 性能优化分析 最近通过一个日志表做排行的时候发现特别卡,最后问题得到了解决,梳理一些索引和MySQL执行过程的经验,但是最后还是有5个谜题没解开,希望大家帮忙解答下。本文主要包含如下知识点:用数据说话证明慢日志的扫描行数到底是如何统计出来的;从 grou
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- CentOS8编译安装MySQL8.0.19
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Hadoop3单机部署,实现最简伪集群
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果