Eureka 源码分析之 Eureka Client
文章首发于微信公众号《程序员果果》
地址:https://mp.weixin.qq.com/s/47TUd96NMz67_PCDyvyInQ
简介
Eureka是一种基于REST(Representational State Transfer)的服务,主要用于AWS云,用于定位服务,以实现中间层服务器的负载平衡和故障转移。我们将此服务称为Eureka Server。Eureka还附带了一个基于Java的客户端组件Eureka Client,它使与服务的交互变得更加容易。客户端还有一个内置的负载均衡器,可以进行基本的循环负载均衡。在Netflix,一个更复杂的负载均衡器包含Eureka基于流量,资源使用,错误条件等多种因素提供加权负载平衡,以提供卓越的弹性。
先看一张 github 上 Netflix Eureka 的一架构图,如下:
从图可以看出在这个体系中,有2个角色,即Eureka Server和Eureka Client。而Eureka Client又分为Applicaton Service和Application Client,即服务提供者何服务消费者。 每个区域有一个Eureka集群,并且每个区域至少有一个eureka服务器可以处理区域故障,以防服务器瘫痪。
Eureka Client 在 Eureka Server 注册,然后Eureka Client 每30秒向 Eureka Server 发送一次心跳来更新一次租约。如果 Eureka Client 无法续订租约几次,则会在大约90秒内 Eureka Server 将其从服务器注册表中删除。注册信息和续订将复制到群集中的所有 Eureka Server 节点。来自任何区域的客户端都可以查找注册表信息(每30秒发生一次)根据这些注册表信息,Application Client 可以远程调用 Applicaton Service 来消费服务。
源码分析
基于Spring Cloud的 eureka 的 client 端在启动类上加上 @EnableDiscoveryClient 注解,就可以 用 NetFlix 提供的 Eureka client。下面就以 @EnableDiscoveryClient 为入口,进行Eureka Client的源码分析。
@EnableDiscoveryClient,通过源码可以发现这是一个标记注解:
/**
* Annotation to enable a DiscoveryClient implementation.
* @author Spencer Gibb
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
boolean autoRegister() default true;
}
通过注释可以知道 @EnableDiscoveryClient 注解是用来 启用 DiscoveryClient 的实现,DiscoveryClient接口代码如下:
public interface DiscoveryClient {
String description();
List<ServiceInstance> getInstances(String serviceId);
List<String> getServices();
}
接口说明:
- description():实现描述。
- getInstances(String serviceId):获取与特定serviceId关联的所有ServiceInstance
- getServices():返回所有已知的服务ID
DiscoveryClient 接口的实现结构图:
- EurekaDiscoveryClient:Eureka 的 DiscoveryClient 实现类。
- CompositeDiscoveryClient:用于排序可用客户端的发现客户端的顺序。
- NoopDiscoveryClient:什么都不做的服务发现实现类,已经被废弃。
- SimpleDiscoveryClient:简单的服务发现实现类 SimpleDiscoveryClient,具体的服务实例从 SimpleDiscoveryProperties 配置中获取。
EurekaDiscoveryClient 是 Eureka 对 DiscoveryClient接口的实现,代码如下:
public class EurekaDiscoveryClient implements DiscoveryClient {
public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";
private final EurekaInstanceConfig config;
private final EurekaClient eurekaClient;
public EurekaDiscoveryClient(EurekaInstanceConfig config, EurekaClient eurekaClient) {
this.config = config;
this.eurekaClient = eurekaClient;
}
@Override
public String description() {
return DESCRIPTION;
}
@Override
public List<ServiceInstance> getInstances(String serviceId) {
List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,
false);
List<ServiceInstance> instances = new ArrayList<>();
for (InstanceInfo info : infos) {
instances.add(new EurekaServiceInstance(info));
}
return instances;
}
@Override
public List<String> getServices() {
Applications applications = this.eurekaClient.getApplications();
if (applications == null) {
return Collections.emptyList();
}
List<Application> registered = applications.getRegisteredApplications();
List<String> names = new ArrayList<>();
for (Application app : registered) {
if (app.getInstances().isEmpty()) {
continue;
}
names.add(app.getName().toLowerCase());
}
return names;
}
}
从代码可以看出 EurekaDiscoveryClient 实现了 DiscoveryClient 定义的规范接口,真正实现发现服务的是 EurekaClient,下面是 EurekaClient 依赖结构图:
EurekaClient 唯一实现类 DiscoveryClient,DiscoveryClient 的构造方法如下:
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
//省略...
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
//省略...
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
//省略...
}
可以看到这个构造方法里面,主要做了下面几件事:
- 创建了scheduler定时任务的线程池,heartbeatExecutor心跳检查线程池(服务续约),cacheRefreshExecutor(服务获取)
- 然后initScheduledTasks()开启上面三个线程池,往上面3个线程池分别添加相应任务。然后创建了一个instanceInfoReplicator(Runnable任务),然后调用InstanceInfoReplicator.start方法,把这个任务放进上面scheduler定时任务线程池(服务注册并更新)。
服务注册(Registry)
上面说了,initScheduledTasks()方法中调用了InstanceInfoReplicator.start()方法,InstanceInfoReplicator 的 run()方法代码如下:
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
发现 InstanceInfoReplicator的run方法,run方法中会调用DiscoveryClient的register方法。DiscoveryClient 的 register方法 代码如下:
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
最终又经过一系列调用,最终会调用到AbstractJerseyEurekaHttpClient的register方法,代码如下:
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
可以看到最终通过http rest请求eureka server端,把应用自身的InstanceInfo实例注册给server端,我们再来完整梳理一下服务注册流程:
Renew服务续约
服务续约和服务注册非常类似,HeartbeatThread 代码如下:
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
//更新最后一次心跳的时间
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
// 续约的主方法
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
发送心跳 ,请求eureka server 端 ,如果接口返回值为404,就是说服务不存在,那么重新走注册流程。
如果接口返回值为404,就是说不存在,从来没有注册过,那么重新走注册流程。
服务续约流程如下图:
服务下线cancel
在服务shutdown的时候,需要及时通知服务端把自己剔除,以避免客户端调用已经下线的服务,shutdown()方法代码如下:
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
// 关闭各种定时任务
// 关闭刷新实例信息/注册的定时任务
// 关闭续约(心跳)的定时任务
// 关闭获取注册信息的定时任务
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
// 更改实例状态,使实例不再接收流量
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
//向EurekaServer端发送下线请求
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
private void cancelScheduledTasks() {
if (instanceInfoReplicator != null) {
instanceInfoReplicator.stop();
}
if (heartbeatExecutor != null) {
heartbeatExecutor.shutdownNow();
}
if (cacheRefreshExecutor != null) {
cacheRefreshExecutor.shutdownNow();
}
if (scheduler != null) {
scheduler.shutdownNow();
}
}
void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
}
先关闭各种定时任务,然后向eureka server 发送服务下线通知。服务下线流程如下图:
参考
https://github.com/Netflix/eureka/wiki
http://yeming.me/2016/12/01/eureka1/
http://blog.didispace.com/springcloud-sourcecode-eureka/
https://www.jianshu.com/p/71a8bdbf03f4

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
2019 年了,为什么我还在用 jQuery?
译者按: 看来 jQuery 还是有一些用武之地的。 原文: Why I'm Still Using jQuery in 2019 译者: Fundebug 为了保证可读性,本文采用意译而非直译。翻译仅供学习探讨,不代表 Fundebug 观点。 许多人都在提倡: “直接用原生的 JavaScript 就好了,不需要 jQuery 了”。 You might not need jQuery尝试告诉我们,摆脱 jQuery 是一件很容易的事情。但是,它的第一个例子恰恰告诉我们用 jQuery 其实也不错,因为我们写了 10 行原生的 JavaScript 代码,其实只需要 1 行 jQuery 代码就够了。 很多 JavaScript 的 API,尤其是 DOM 相关的 API,挑战了我的审美哲学,直白点说,我觉得它们太糟糕了!el.insertAdjacentElement('afterend', other)当然也可以用,但是$(el).after(other)更加简洁。$()函数也没那么好看,我没有特别喜欢,但是它比原生的 API 好太多了。 你们如何获取某个元素的 sibling...
-
下一篇
卷积神经网络学习
来源商业新知网,原标题:如何入手卷积神经网络 卷积神经网络可以算是深度神经网络中很流行的网络了。本文从基础入手,介绍了卷积网络的基本原理以及相关的其它技术,并利用卷积网络做了一个简单项目作为示例参考。首先,我们先看看下面这张照片: 图源:Pix2PixHD 这不是一张真实的照片,你可以新建一个窗口来打开它,放大看看,可以看到马赛克。 实际上,这张照片是由 AI 生成的,是不是看起来很真实? 从 Alex Krizhevsky 及其朋友通过 ImageNet 公布这项技术至今,不过才七年。ImageNet 是一个大规模图像识别竞赛,每年都会举办,识别种类达 1000 多种,从阿拉斯加雪橇犬到厕纸应用尽有。之后,他们又创建了 AlexNet,获得了 ImageNet 竞赛冠军,远超第二名。 这项技术就是卷积神经网络。它是深度神经网络的一个分支,处理图像的效果格外好。 图源:ImageNet 上图是几年来赢得 ImageNet 挑战赛的软件产生的误差率。可以发现,2016 年误差率降到了 5%,已经超越人类水平。 深度学习的引入与其说是改变规则,不如说是在打破规则。 卷积神经网络架构 那么问...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- MySQL数据库在高并发下的优化方案
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果