源码解读:揭秘Nacos服务发现全过程
作为一个开发者,解读开源代码是一项非常重要的技能,在上篇文章《源码解读:读多写少的Nacos是如何实现高性能设计的?》中介绍了“盲猜”法的方式解读开源代码,并且使用这种方法成功的将Nacos服务端的源码梳理了一遍,介绍了其后端的一些机制、技巧。上篇文章中仅仅介绍了Nacos后端的代码逻辑,没有涉及应用方服务作为Nacos客户端是如何将自身的节点配置注册给Nacos后端的。上篇文章结尾处说明了“盲猜”法是建立在有丰富经验的基础上的,但是,如果盲猜不到,又或者没有什么经验的情况下就不适用了,因此,本文将延续上篇文章使用调试法暴力解读开源代码,从入口到到结束将Nacos服务发现的全过程理一遍。
本文涉及知识点:JDK SPI和Spring SPI、Spring-boot启动过程等。
架构原理简介
首先,我们需要找到服务发现的起点,也就是最一开始是从哪里发起的请求。可以想像,肯定是在我们的应用服务中了,那么,我们是如何使用Nacos的呢?使用的时候是很简单的,只需要以下几个步骤就可以了:
1. 引入Nacos的包
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- 这里使用了openfeign也可以使用其它方案 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
2. 配置服务地址
spring:
application:
name: app-server
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
3. 以openfeign为例,代码中负载均衡调用RPC服务
package com.demo.client;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;
/**
* @author Old Apple
*/
@FeignClient(value = "app-server", path="/demo")
public interface HelloRpcClient {
/**
* 测试接口hello
* @return 测试输出
*/
@GetMapping("/hello")
String hello();
}
/**
* Consumer
*/
@Service
public class DemoServiceImpl {
@Autowired
private HelloRpcClient helloRpcClient;
public String hello() {
return helloRpcClient.hello();
}
}
那么,整个RPC调用过程需要有哪些角色呢:
- Consumer,服务消费者;
- Provider,服务提供者;
- Service Registry Center,服务注册中心;
- 其他:RPC协议、负载均衡算法等。
服务提供者需要把自身的节点信息注册给服务注册中心,同时每隔一段时间需要做一次健康检查,确保服务提供者是可用状态。服务消费者从注册中心获取服务提供者的节点信息包括IP地址、端口、权重等,然而一般会存在多个服务提供者,所以需要一个负载均衡算法来决定调用哪一个服务。
那么,现在从Provider出发,看下Provider是如何将自身发布到注册中心的。在我们的业务代码中并没有显式的调用任何Nacos相关的接口,但是服务启动时却自动注册了,这究竟是怎么回事呢?使用Spring boot启动时,除了执行我们的代码外,还有其它的流程。而调用Nacos接口就是在自动化配置时做的,自动化配置是基于SPI机制的,首先需要了解下什么是SPI。
什么是Spring SPI?
SPI:全称为 Service Provider Interface,是一种服务发现机制。它通过在ClassPath路径下的META-INF/services文件夹查找文件,自动加载文件里所定义的类。而Spring SPI与JDK SPI有所不同,Spring SPI通过查找所有的spring.factories文件中找到相应的key,从而加载里面的类。我们在spring-cloud-starter-alibaba-nacos-discovery-2021.1.jar中找到spring.factories,文件内容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\
com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\
com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\
com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\
com.alibaba.cloud.nacos.NacosServiceAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
Spring boot在启动时,会扫描该文件加载其中的Java类,根据名字可以分析出com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration与服务注册是有关系的。
进入该类中,代码中NacosAutoServiceRegistration的Bean,会在启动时创建。
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
下面进入类NacosAutoServiceRegistration中,如下图,类中有一个register方法,在此处断点:
启动Spring boot我们发现果然在这里命中了断点,说明这里就是向注册中心发布的调用起点。该类调用了父类的register方法,查看下该类的构造函数:
将serviceRegistry传给父类的构造函数,而super.register()最终调用的则是serviceRegistry的register方法。serviceRegistry的实例则是在NacosServiceRegistryAutoConfiguration类中构建的,如下图,所以服务注册的逻辑再往下追溯就应该在NacosServiceRegistry类中了。
服务提供者发布流程
进入NacosServiceRegistry类中,如下:
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.nacos.registry;
import java.util.List;
import java.util.Properties;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.NacosServiceManager;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.serviceregistry.Registration;
import org.springframework.cloud.client.serviceregistry.ServiceRegistry;
import org.springframework.util.StringUtils;
import static org.springframework.util.ReflectionUtils.rethrowRuntimeException;
/**
* @author xiaojing
* @author <a href="mailto:mercyblitz@gmail.com">Mercy</a>
* @author <a href="mailto:78552423@qq.com">eshun</a>
*/
public class NacosServiceRegistry implements ServiceRegistry<Registration> {
private static final String STATUS_UP = "UP";
private static final String STATUS_DOWN = "DOWN";
private static final Logger log = LoggerFactory.getLogger(NacosServiceRegistry.class);
private final NacosDiscoveryProperties nacosDiscoveryProperties;
@Autowired
private NacosServiceManager nacosServiceManager;
public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
}
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
Instance instance = getNacosInstanceFromRegistration(registration);
try {
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
// rethrow a RuntimeException if the registration is failed.
// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e);
}
}
@Override
public void deregister(Registration registration) {
log.info("De-registering from Nacos Server now...");
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No dom to de-register for nacos client...");
return;
}
NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
try {
namingService.deregisterInstance(serviceId, group, registration.getHost(),
registration.getPort(), nacosDiscoveryProperties.getClusterName());
}
catch (Exception e) {
log.error("ERR_NACOS_DEREGISTER, de-register failed...{},",
registration.toString(), e);
}
log.info("De-registration finished.");
}
@Override
public void close() {
try {
nacosServiceManager.nacosServiceShutDown();
}
catch (NacosException e) {
log.error("Nacos namingService shutDown failed", e);
}
}
@Override
public void setStatus(Registration registration, String status) {
if (!STATUS_UP.equalsIgnoreCase(status)
&& !STATUS_DOWN.equalsIgnoreCase(status)) {
log.warn("can't support status {},please choose UP or DOWN", status);
return;
}
String serviceId = registration.getServiceId();
Instance instance = getNacosInstanceFromRegistration(registration);
if (STATUS_DOWN.equalsIgnoreCase(status)) {
instance.setEnabled(false);
}
else {
instance.setEnabled(true);
}
try {
Properties nacosProperties = nacosDiscoveryProperties.getNacosProperties();
nacosServiceManager.getNamingMaintainService(nacosProperties).updateInstance(
serviceId, nacosDiscoveryProperties.getGroup(), instance);
}
catch (Exception e) {
throw new RuntimeException("update nacos instance status fail", e);
}
}
@Override
public Object getStatus(Registration registration) {
String serviceName = registration.getServiceId();
try {
List<Instance> instances = namingService().getAllInstances(serviceName);
for (Instance instance : instances) {
if (instance.getIp().equalsIgnoreCase(nacosDiscoveryProperties.getIp())
&& instance.getPort() == nacosDiscoveryProperties.getPort()) {
return instance.isEnabled() ? "UP" : "DOWN";
}
}
}
catch (Exception e) {
log.error("get all instance of {} error,", serviceName, e);
}
return null;
}
private Instance getNacosInstanceFromRegistration(Registration registration) {
Instance instance = new Instance();
instance.setIp(registration.getHost());
instance.setPort(registration.getPort());
instance.setWeight(nacosDiscoveryProperties.getWeight());
instance.setClusterName(nacosDiscoveryProperties.getClusterName());
instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
instance.setMetadata(registration.getMetadata());
instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
return instance;
}
private NamingService namingService() {
return nacosServiceManager
.getNamingService(nacosDiscoveryProperties.getNacosProperties());
}
}
在register方法中首先获取当前的节点实例的注册信息,然后调用Nacos客户端NamingService的方法注册。
那么NamingService中做了什么呢,由NacosNamingService实现。
NacosNamingService类中通过NamingProxy执行HTTP请求,然后就由Nacos的服务端Controller响应处理。
如下断点截图,可以看出请求的HTTP地址为/nacos/v1/ns/instance。切到Nacos的源码项目中,就很容易找到该路径对应的Controller类了。
到Controller之后就可以见《源码解读:读多写少的Nacos是如何实现高性能设计的?》文中的解读了,本文继续看客户端的逻辑。
健康检查与心跳探测
前面解读了客户端服务发布到Nacos的逻辑,在服务发布之后,客户端会每隔一段时间进行一次健康检查,也就是每隔一段时间向Nacos服务发送一次心跳请求,证明自己还活着。如果Nacos服务检测到某个服务超出一段时间没有进行心跳探测,那么会认为该服务已经异常。
如下图,非持久化的服务在注册时,会由客户端每隔一段时间发起一次心跳探测。
那么进入beatReactor中,代码如下:
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.beat;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.CommonParams;
import com.alibaba.nacos.api.naming.NamingResponseCode;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.naming.net.NamingProxy;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;
/**
* Beat reactor.
*
* @author harold
*/
public class BeatReactor implements Closeable {
private final ScheduledExecutorService executorService;
private final NamingProxy serverProxy;
private boolean lightBeatEnabled = false;
public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();
public BeatReactor(NamingProxy serverProxy) {
this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
}
public BeatReactor(NamingProxy serverProxy, int threadCount) {
this.serverProxy = serverProxy;
this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
}
/**
* Add beat information.
*
* @param serviceName service name
* @param beatInfo beat information
*/
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
/**
* Remove beat information.
*
* @param serviceName service name
* @param ip ip of beat information
* @param port port of beat information
*/
public void removeBeatInfo(String serviceName, String ip, int port) {
NAMING_LOGGER.info("[BEAT] removing beat: {}:{}:{} from beat map.", serviceName, ip, port);
BeatInfo beatInfo = dom2Beat.remove(buildKey(serviceName, ip, port));
if (beatInfo == null) {
return;
}
beatInfo.setStopped(true);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
/**
* Build new beat information.
*
* @param instance instance
* @return new beat information
*/
public BeatInfo buildBeatInfo(Instance instance) {
return buildBeatInfo(instance.getServiceName(), instance);
}
/**
* Build new beat information.
*
* @param groupedServiceName service name with group name, format: ${groupName}@@${serviceName}
* @param instance instance
* @return new beat information
*/
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(groupedServiceName);
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
return beatInfo;
}
public String buildKey(String serviceName, String ip, int port) {
return serviceName + Constants.NAMING_INSTANCE_ID_SPLITTER + ip + Constants.NAMING_INSTANCE_ID_SPLITTER + port;
}
@Override
public void shutdown() throws NacosException {
String className = this.getClass().getName();
NAMING_LOGGER.info("{} do shutdown begin", className);
ThreadUtils.shutdownThreadPool(executorService, NAMING_LOGGER);
NAMING_LOGGER.info("{} do shutdown stop", className);
}
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
try {
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
}
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}
可以看出使用了延时任务,每隔一段时间发起一次HTTP请求,时间间隔为配置的heartBeatInterval的值。这里的代码是临时实例的健康检查,但实际上,Nacos分为临时实例和永久实例。临时实例只是临时存在于注册中心中,会在服务下线或不可用时被注册中心剔除,临时实例会与注册中心保持心跳,注册中心会在一段时间没有收到来自客户端的心跳后会将实例设置为不健康,然后在一段时间后进行剔除。永久实例在被删除之前会永久的存在于注册中心,且有可能并不知道注册中心存在,不会主动向注册中心上报心跳,那么这个时候就需要注册中心主动进行探活。
这部分的架构原理可以参考官方文档:https://www.yuque.com/nacos/ebook/qrkw0g#xXHZG
切换到服务端代码根据客户端调用HTTP的URL找到对应的Controller,进而继续往下查发现服务端只是更新了一下时间戳。但是,一直有个线程在检查着存活的实例:
首先会将实例设定为非健康,然后仍然未等到心跳时再移除。
RPC请求与负载均衡
服务消费者(Consumer)调用服务提供者(Provider)时,需要知道它要调用哪一个服务提供者,因为服务提供者可能有很多个。所以,服务消费者需要有一个策略以选择其中的一个可用的服务调用。这个策略就是负载均衡算法了,常用的负载均衡算法有:随机、加权随机、轮询、加权轮询、最小连接数等。在上述架构中使用了OpenFeign,目前默认是使用LoadBalancer负载均衡的,而最终调用了Nacos Client的selectInstances方法。
其中hostReactor会订阅服务端的实例更新事件,subscribe在这里为true。不为true时是直接从Nacos服务端查询健康的实例,对应的url为/instance/list,这部分代码不做过多解读,只是从内存中根据查询条件查找而已。而getServiceInfo的方法这是直接从本地内存中获取的,虽然同一时刻未必是完全一致的。



