您现在的位置是:首页 > 文章详情

Dubbo服务暴露与注册

日期:2019-06-16点击:333

        前面的文章中,我们讲解了Dubbo是如何进行配置的属性的初始化的,并且讲到,Dubbo最终会将所有的属性参数都封装为一个URL对象,从而以这个URL对象为基准传递参数。本文则主要讲解Dubbo是如何基于URL对象进行服务的暴露与注册的。

        首先需要说明的一点是,服务的暴露与注册是两个不同的概念。在Dubbo中,微服务之间的交互默认是通过Netty进行的,而服务之间的通信是基于TCP以全双工的方式进行的。那么也就是说,每个服务都会存在一个ip和port。所谓的服务暴露就是指根据配置将当前服务使用Netty绑定一个本地的端口号(对于消费者而言,则是尝试连接目标服务的ip和端口)。至于注册,由于微服务架构中对于新添加的服务,需要一定的机制来通知消费者,有新的服务可用,或者对于某些下线的服务,也需要通知消费者,将这个已经下线的服务给移除。Dubbo中服务的注册与发现默认是委托给zookeeper来进行的。本文主要讲解服务的暴露与注册的整体实现结构,至于服务暴露和注册时所需要注意的详细细节,则在后面的文章中进行讲解。

1. 服务的暴露

        服务的暴露的入口位置主要在RegistryProtocol.export()方法中,该方法首先会进行服务的暴露,然后会进行服务的注册。如下是该方法的源码:

@Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 获取服务注册相关的配置数据 URL registryUrl = getRegistryUrl(originInvoker); // 获取provider相关的配置数据 URL providerUrl = getProviderUrl(originInvoker); // 对provider的部分配置信息进行覆盖,重写的工作主要是委托给Configurator进行, // 这里OverrideListener的作用主要是在当前服务的配置信息发生更改时,对原有的配置进行重写, // 并且会判断是否需要对当前的服务进行重新暴露 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); // 进行服务的本地暴露,本质上就是根据配置使用Netty绑定本地的某个端口,从而完成服务暴露工作 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // 根据配置获取对应的Registry对象,常见的有ZookeeperRegistry和RedisRegistry,默认使用的是 // ZookeeperRegistry,本文则以Zookeeper为例进行讲解 final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); // 将当前的Invoker对象注册到一个全局的providerInvokers中进行缓存, // 该Map对象保存了所有的已经暴露了的服务 ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); // 除非主动配置不进行注册,那么这里将会返回true boolean register = registeredProviderUrl.getParameter("register", true); if (register) { // 进行服务注册的代码,主要是通过Zookeeper的客户端CuratorFramework进行服务的注册 register(registryUrl, registeredProviderUrl); // 将当前Invoker标识为已经注册完成 providerInvokerWrapper.setReg(true); } // 注册配置被更改的监听事件,将配置被更改时将会触发相应的listener registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // 设置相关的URL对象,并且使用DestroyableExporter对exporter进行封装返回 exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); return new DestroyableExporter<>(exporter); } 

        上面的代码中,主要完成了三部分的工作:

  • 将服务与本地的某个端口号进行绑定,从而实现服务暴露的功能;
  • 根据配置得到一个服务注册对象Registry,然后对其进行注册;
  • 创建一个配置被重写的监听器,并且注册该监听器,从而实现配置被重写时能够动态的使用新的配置进行服务的配置。

        对于服务的暴露,主要是在doLocalExport()方法中,我们继续阅读其源码:

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { // 获取当前Invoker对应的key,默认为group/interface/version的格式 String key = getCacheKey(originInvoker); // 这一段代码看起来比较复杂,其实本质上还是protocol.export()方法的调用,该方法就是进行服务暴露的代码, // 而ExporterChangeableWrapper的主要作用则是进行unexport()时的一些清理工作 return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); }); } 

        doLocalExport()方法的实现比较简单,主要的导出工作还是委托给了protocol.export()方法进行,这里的protocol的类型为DubboProtocol,这里我们直接看其export()方法:

@Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); // 这里主要是构建Stub的事件分发器,该分发器用于在消费者端进行Stub事件的分发 Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } // 开启服务 openServer(url); // 该方法的主要作用是对序列化进行优化,其会获取配置的实现了SerializationOptimizer接口的配置类, // 然后通过其getSerializableClasses()方法获取序列化类,通过这些类来进行序列化的优化 optimizeSerialization(url); return exporter; } 

        export()方法主要做了三件事:a. 注册stub事件分发器;b. 开启服务;c. 注册序列化优化器类。这里openServer()方法是用于开启服务的,我们继续阅读其源码:

private void openServer(URL url) { String key = url.getAddress(); boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); // 这里采用双检查法来判断对应于当前服务的server是否已经创建,如果没有创建, // 则创建一个新的,并且缓存起来 if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { // 创建并缓存新服务 serverMap.put(key, createServer(url)); } } } else { server.reset(url); } } } private ExchangeServer createServer(URL url) { url = URLBuilder.from(url) .addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()) .addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)) .addParameter(Constants.CODEC_KEY, DubboCodec.NAME) .build(); // 获取所使用的server类型,默认为netty String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported server type: " + str + ", url: " + url); } // 通过Exchangers.bind()方法进行服务的绑定 ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } // 获取client参数所指定的值,该值指定了当前client所使用的传输层服务,比如netty或mina。 // 然后判断当前SPI所提供的传输层服务是否包含所指定的服务类型,如果不包含,则抛出异常 str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class) .getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; } 

        上面的代码主要是创建ExchangeServer的,使用双检查来检测是否已经存在了对应的服务,如果不存在,则通过Exchangers.bind()方法进行创建。这里最终会将bind()方法的调用委托给HeaderExchanger.bind()方法进行。需要注意的是,上面的代码中传入了一个requestHandler的参数,这是一个ExchangeHandler类型的对象,其主要作用是获取并且调用Invoker,以得到最终的调用结果,这些Handler的作用,我们将在后面的文章中进行讲解,本文主要讲解服务的暴露与注册的过程。下面我们继续阅读HeaderExchanger.bind()方法的源码:

@Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler( new HeaderExchangeHandler(handler)))); } 

        这里的bind()方法主要是创建了三个Handler,并且最后一个Handler将传入的ExchangeHandler包裹起来了。相信读者朋友应该很快就能认识到,这里使用的是责任链模式,这几个handler通过统一的构造函数将下一个handler的实例注入到当前handler中。其实我们也就能够理解,最终通过netty进行的调用过程就是基于这些责任链的。这里我们主要看Transporters.bind()方法的实现原理:

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } // 判断传入的Handler是否只有一个,如果只有一个,则直接使用该handler,如果存在多个, // 则使用ChannelHandlerDispatcher将这些handler包裹起来进行分发 ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } // 通过配置指定的Transporter进行服务的绑定,这里默认使用的是NettyTransporter return getTransporter().bind(url, handler); } // NettyTransporter @Override public Server bind(URL url, ChannelHandler listener) throws RemotingException { // 在NettyTransporter中进行服务绑定时,其只是创建了一个NettyServer以返回,但实际上在创建该对象的 // 过程中,就完成了Netty服务的绑定。需要注意的是,这里的NettyServer并不是Netty所提供的类,而是 // Dubbo自己封装的一个服务类,其对Netty的服务进行了封装 return new NettyServer(url, listener); } 

        Transporters.bind()方法主要是将服务的绑定过程交由NettyTransporter进行,而其则是创建了一个NettyServer对象,真正的绑定过程就在创建该对象的过程中。下面我们来看其创建的源码:

// AbstractServer public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); // 获取绑定的ip和端口号等信息 String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = Constants.ANYHOST_VALUE; } // 在本地绑定指定的ip和端口 bindAddress = new InetSocketAddress(bindIp, bindPort); this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { // 通过创建的InetSocketAddress对象,将真正的绑定过程交由子类进行 doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } // 这里的DataStore只是一个本地缓存的数据仓库,主要是对一些大对象进行缓存 DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class) .getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); } // NettyServer @Override protected void doOpen() throws Throwable { bootstrap = new ServerBootstrap(); // 这里就进入了创建netty服务的过程,bossGroup指定的线程数为1,因为只有一个channel用于接收客户端请求, // 而workerGroup线程数则指定为配置文件所设置的线程数,这些线程主要用于进行请求的处理 bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter( Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); // 创建NettyServerHandler,这个handler就是用于处理请求用的handler,但是前面我们也讲到了, // Dubbo使用了一个handler的责任链来进行消息的处理,第二个参数this就是这个链的链头。需要注意的是, // Netty本身提供的责任链与Dubbo这里使用的责任链是不同的,Dubbo只是使用了Netty的链的一个节点来 // 处理Dubbo所创建的链,这样Dubbo的链其实是可以在多种服务复用的,比如Mina final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); // 这里是标准的创建Netty的BootstrapServer的过程 bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline() // 添加用于解码的handler .addLast("decoder", adapter.getDecoder()) // 添加用于编码的handler .addLast("encoder", adapter.getEncoder()) // 添加用于进行心跳监测的handler .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) // 将处理请求的handler添加到pipeline中 .addLast("handler", nettyServerHandler); } }); // 进行服务的绑定 ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); } 

        上面的代码就是一个标准的使用Netty进行服务绑定的代码,关于Netty的使用,读者朋友可以阅读Netty Reactor模式实现原理详解

2. 服务的注册

        对于服务的注册,前面我们已经讲到,入口主要在RegistryProtocol.export()方法中,而调用入口则是通过其register()方法进行的,这里我们来看一下该方法的调用过程:

public void register(URL registryUrl, URL registeredProviderUrl) { // 通过RegistryFactory获取一个Registry对象,该对象的主要作用是进行服务的注册, // 这里默认返回的是ZookeeperRegistry Registry registry = registryFactory.getRegistry(registryUrl); registry.register(registeredProviderUrl); } 

        这里主要是根据配置获取一个Registry对象,我们继续阅读其register()方法的源码:

// FailbackRegistry @Override public void register(URL url) { // 将当前URL对象保存到已注册的URL对象列表中 super.register(url); // 移除之前注册失败的记录 removeFailedRegistered(url); removeFailedUnregistered(url); try { // 将真正的注册过程委托给ZookeeperRegistry进行 doRegister(url); } catch (Exception e) { Throwable t = e; // 下面的过程主要是在注册失败的情况下,将当前URL添加到注册失败的URL列表中 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // 将当前URL添加到注册失败的URL列表中 addFailedRegistered(url); } } // ZookeeperRegistry @Override public void doRegister(URL url) { try { // 这里是真正的注册过程。需要注意的是这里的zkClient类型为ZookeeperClient,其是Dubbo对 // 真正使用的CuratorFramework的一个封装 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } 

        上面的代码中首先会对一些缓存数据进行清理,并且将当前URL添加到注册的URL列表中,然后将注册过程委托给ZookeeperClient进行。下面我们来看其是如何进行注册的:

@Override public void create(String path, boolean ephemeral) { // 判断创建的是否为临时节点,如果不是临时节点,则判断是否已经存在该节点,如果存在,则直接返回 if (!ephemeral) { if (checkExists(path)) { return; } } // 对path进行截取,因为最后一个"/"后面是被编码的URL对象,前面则是serviceKey + category // 这里的category指定的是provider还是consumer int i = path.lastIndexOf('/'); if (i > 0) { // 创建节点,需要注意的是,这里的create()方法进行的是递归调用,这是因为zookeeper创建节点时 // 只能一级一级的创建,因而其每次都是取"/"前面的一部分来创建,只有当前节点已经存在的情况下, // 上面的checkExists()才会为true,而且这里,由于zookeeper规定,除了叶节点以外,其余所有的 // 节点都必须为非临时节点,因而这里第二个参数传入的是false,这也是前面的if判断能通过的原因 create(path.substring(0, i), false); } if (ephemeral) { // 创建临时节点,具体的创建工作交由子类进行,也就是下面的代码 createEphemeral(path); } else { // 创建持久节点,具体的创建工作交由子类进行,也就是下面的代码 createPersistent(path); } } 
@Override public void createEphemeral(String path) { try { // 将临时节点的创建工作交由CuratorFramework进行 client.create().withMode(CreateMode.EPHEMERAL).forPath(path); } catch (NodeExistsException e) { } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } 
@Override public void createPersistent(String path) { try { // 将持久节点的创建工作交由CuratorFramework进行 client.create().forPath(path); } catch (NodeExistsException e) { } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } 

3. 小结

        本文主要讲解了Dubbo在导出服务时是如何进行服务暴露与注册的,并且具体讲解了如何基于netty进行服务的暴露,和如何基于zookeeper进行服务的注册。

原文链接:https://my.oschina.net/zhangxufeng/blog/3062739
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章