18. 从零开始编写一个类nginx工具, 主动式健康检查源码实现
wmproxy
wmproxy
将用Rust
实现http/https
代理, socks5
代理, 反向代理, 静态文件服务器,后续将实现websocket
代理, 内外网穿透等, 会将实现过程分享出来, 感兴趣的可以一起造个轮子法
项目地址
gite: https://gitee.com/tickbh/wmproxy
github: https://github.com/tickbh/wmproxy
为什么我们需要主动
主动可以让我们掌握好系统的稳定性,假设我们有一条连接不可达,连接超时的判定是5秒,需要检测失败3次才认定为失败,那么此时从我们开始检测,到判定失败需要耗时15秒。
如果此时我们是个高并发的系统,每秒的QPS是1000,我们有三个地址判定,那么此时我们有1/3的失败概率。那么在15秒内,我们会收到15000个请求,会造成5000个请求失败,如果是重要的数据,我们会丢失很多重要数据。
如果此时客户端拥有重试机制,那么客户端在失败的时候会发起重试,而且系统可能会反复的分配到那台不可达的系统,将会造成短时间内请求数激增,可能引发系统的雪崩。
所以此时我们主动知道目标端的系统稳定性极其重要。
网络访问示意图
以下是没有主动健康检查
如果出错的时候,一个请求的平均时长可能会达到(1.4s + 5s) / 2 = (3.2s)
,比正常访问多了(3.2 - 1.4) = 1.8s
,节点的宕机会对系统的稳定性产生较大的影响
以下是主动健康检查,它保证了访问后端服务器组均是正常的状态
服务器2
出错的时候,主动检查已经检查出服务器2
不可用,负载均衡的时候选择已经把服务器2
摘除,所以系统的平均耗时1.4s,系统依然保持稳定
健康检查的种类
在目前的系统中有以下两分类:
- HTTP 请求特定的方法及路径,判断返回是否得到预期的status或者body
- TCP 仅只能测试连通性,如果能连接表示正常,会出现能连接但无服务的情况
健康检查的准备
我们需要从配置中读出所有的需要健康检查的类型,即需要去重,把同一个指向的地址过滤掉 配置有可能被重新加载,所以我们需要预留发送配置的方式(或者后续类似nginx用新开进程的方式则不需要),此处做一个预留。
-
如何去重 像这种简单级别的去重通常用
HashSet
复杂度为O(1)
或者用简单的Vec
复杂度为O(n)
,以SocketAddr
的为键值,判断是否有重复的数据。 -
如何保证不影响主线程 把健康请求的方法移到异步函数,用
tokio::spawn
中处理,在健康检查的情况下保证不影响其它数据处理 -
如果同时处理多个地址的健康检查 每一次健康检查都会在一个异步函数中执行,在我们调用完请求后,我们会对当前该异步进行
tokio::time::sleep
以让出当前CPU。 -
如何按指定间隔时间请求 因为每一次健康请求都是在异步函数中,我们不确认之前的异步是否完成,所以我们在每次请求前都记录
last_request
,我们在请求前调用HealthCheck::check_can_request
判断当前是否可以发送请求来保证间隔时间内不多次请求造成服务器的压力。 -
超时连接判定处理 利用
tokio::time::timeout
和future
做组合,等超时的时候直接按错误处理
部分实现源码
主要源码定义在check/active.rs
中,主要的定义两个类
/// 单项健康检查 #[derive(Debug, Clone)] pub struct OneHealth { /// 主动检查地址 pub addr: SocketAddr, /// 主动检查方法, 有http/https/tcp等 pub method: String, /// 每次检查间隔 pub interval: Duration, /// 最后一次记录时间 pub last_record: Instant, } /// 主动式健康检查 pub struct ActiveHealth { /// 所有的健康列表 pub healths: Vec<OneHealth>, /// 接收健康列表,当配置变更时重新载入 pub receiver: Receiver<Vec<OneHealth>>, }
我们在配置的时候获取所有需要主动检查的数据
/// 获取所有待健康检查的列表 pub fn get_health_check(&self) -> Vec<OneHealth> { let mut result = vec![]; let mut already: HashSet<SocketAddr> = HashSet::new(); if let Some(proxy) = &self.proxy { // ... } if let Some(http) = &self.http { // ... } result }
主要的检查源码,所有的最终信息都落在HealthCheck
中的静态变量里:
pub async fn do_check(&self) -> ProxyResult<()> { // 防止短时间内健康检查的连接过多, 做一定的超时处理, 或者等上一条消息处理完毕 if !HealthCheck::check_can_request(&self.addr, self.interval) { return Ok(()) } if self.method.eq_ignore_ascii_case("http") { match tokio::time::timeout(self.interval + Duration::from_secs(1), self.connect_http()).await { Ok(r) => match r { Ok(r) => { if r.status().is_server_error() { log::trace!("主动健康检查:HTTP:{}, 返回失败:{}", self.addr, r.status()); HealthCheck::add_fall_down(self.addr); } else { HealthCheck::add_rise_up(self.addr); } } Err(e) => { log::trace!("主动健康检查:HTTP:{}, 发生错误:{:?}", self.addr, e); HealthCheck::add_fall_down(self.addr); } }, Err(e) => { log::trace!("主动健康检查:HTTP:{}, 发生超时:{:?}", self.addr, e); HealthCheck::add_fall_down(self.addr); }, } } else { match tokio::time::timeout(Duration::from_secs(3), self.connect_http()).await { Ok(r) => { match r { Ok(_) => { HealthCheck::add_rise_up(self.addr); } Err(e) => { log::trace!("主动健康检查:TCP:{}, 发生错误:{:?}", self.addr, e); HealthCheck::add_fall_down(self.addr); } } } Err(e) => { log::trace!("主动健康检查:TCP:{}, 发生超时:{:?}", self.addr, e); HealthCheck::add_fall_down(self.addr); } } } Ok(()) }
结语
主动检查可以及时的更早的发现系统中不稳定的因素,是系统稳定性的基石,也可以通过更早的发现因素来通知运维介入,我们的目的是使系统更稳定,更健壮,处理延时更少。
点击 <font color=green>[关注]</font>,<font color=green>[在看]</font>,<font color=green>[点赞]</font> 是对作者最大的支持
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
什么是虚拟线程?一次启1000万个会OOM吗?
之前,在Java新特性专栏中,我们简单介绍了Java 21正式发布的虚拟线程。 昨天,正好看到一个讲解此内容的视频,非常不错,所以DD这里给大家翻译好了,感兴趣的可以看看。可以进一步了解虚拟线程:《什么是虚拟线程?一次启1000万个会怎么样?》 。该视频采用Chrome插件Youtube中文配音做了翻译+配音处理,如果您平时也有上油管看前沿视频的话,也可以装一个,可以有效的提高你的学习效率 ^_^。 如果您当前环境不适合观看视频,也可以通过下面的文字内容学习。下面内容是我根据视频内容,总结的,会更简洁一些。 什么是虚拟线程 虚拟线程是在Java并发领域添加的一个新概念,那么虚拟线程到底是做什么用的呢? 根据JEP中的内容告诉我们,虚拟线程是一种轻量级线程,可以显著地帮助我们减少编写、维护、观察高吞吐量应用程序的工作量。它的实现目标有以下几点: 每个请求一个线程风格编写的程序,能够以接近最佳硬件利用率进行扩展。 什么是每个请求一个线程的风格? 对于HTTP服务器来说,这意味着每个HTTP请求都由它自己的线程处理。对于关系型数据库服务器来说,这意味着每个SQL事务也都由它自己的线程处理。如...
- 下一篇
带着问题去分析:Spring Bean 生命周期 | 京东物流技术团队
1: Bean在Spring容器中是如何存储和定义的 Bean在Spring中的定义是_org.springframework.beans.factory.config.BeanDefinition_接口,BeanDefinition里面存储的就是我们编写的Java类在Spring中的元数据,包括了以下主要的元数据信息: 1:Scope(Bean类型):包括了单例Bean(Singleton)和多实例Bean(Prototype) 2:BeanClass: Bean的Class类型 3:LazyInit:Bean是否需要延迟加载 4:AutowireMode:自动注入类型 5:DependsOn:Bean所依赖的其他Bean的名称,Spring会先初始化依赖的Bean 6:PropertyValues:Bean的成员变量属性值 7:InitMethodName:Bean的初始化方法名称 8:DestroyMethodName:Bean的销毁方法名称 同时BeanDefinition是存储到_org.springframework.beans.factory.support.Defaul...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8编译安装MySQL8.0.19
- CentOS6,CentOS7官方镜像安装Oracle11G
- 2048小游戏-低调大师作品
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Docker安装Oracle12C,快速搭建Oracle学习环境