dubbo 超时设置和源码分析
本文 dubbo 2.6.2
在工作中碰到一个业务接口时间比较长,需要修改超时时间,不知道原理,在网上搜索,看到有人说如果你觉得自己了解了dubbo的超时机制,那么问问自己以下问题:
超时是针对消费端还是服务端?
超时在哪设置?
超时设置的优先级是什么?
超时的实现原理是什么?
超时解决的是什么问题 ?
如果连这些都回答不上了,那只能说明还没有完全掌握 dubbo的超时机制。
于是索性就自己本地搭了个环境,研究了一下源码。 先来说一说结论:
1、超时是针对消费端的,消费端会抛出TimeoutException 而服务器端仅仅是一个 warn日志
2、超时在消费端、服务器端设置,dubbo会合并这两个设置
3、consumer方法级别 > provider 方法级别 > consumer 接口级别 > provider 接口级别 > consumer 全局级别 > provider 全局级别。如果都没配置,那么就是dubbo默认的1秒
4、见下面分析
5、最主要是宝贵的线程,客户端的用户线程不能因为服务端超时而一直类似wait, 导致无法正常响应其他业务。
一、超时时间设置
全局超时配置
<dubbo:consumer timeout="5000" />
指定接口以及特定方法超时配置
<dubbo:service interface="me.kimi.samples.dubbo.facade.QuestionFacade" ref="questionFacade" timeout="6000"> <dubbo:method name="getQuestionById" timeout="7000"/> </dubbo:service>
观察控制台打印的注册URL:
consumer://172.16.71.30/me.kimi.samples.dubbo.facade.QuestionFacade?application=demo- consumer&category=providers,configurators,routers&check=false&default.proxy=jdk&default.timeout=5000& dubbo=2.6.2&getQuestionById.timeout=7000&interface=me.kimi.samples.dubbo.facade.QuestionFacade&logger= log4j&methods=getQuestionById&pid=13884&side=consumer&timeout=6000×tamp=1536630294523
可以看到:
default.timeout=5000
timeout=6000
getQuestionById.timeout=7000
分别对应了全局、类级别、方法级别的超时设置。
省略一部分调用链,最终会来到这里 DubboInvoker,读取超时时间:
com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // 读取超时时间,这里dubbo已经把服务端的timeout参数和消费端的timeout参数合并 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); // 返回 DefaultFuture // get()在没返回值之前会 阻塞 await return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
看一下参数获取的方法:
public int getMethodParameter(String method, String key, int defaultValue) { // 首先查 getQuestionById.timeout String methodKey = method + "." + key; // 从数字缓存中先获取,不需要每次都 parseInt Number n = getNumbers().get(methodKey); if (n != null) { return n.intValue(); } // 没得话,去取字符串值 String value = getMethodParameter(method, key); if (value == null || value.length() == 0) { // 三个地方都没配置,返回默认值,默认是1秒 return defaultValue; } // 放入缓存中 int i = Integer.parseInt(value); getNumbers().put(methodKey, i); return i; }
public String getMethodParameter(String method, String key) { // 首先查 getQuestionById.timeout String value = parameters.get(method + "." + key); if (value == null || value.length() == 0) { // 没有设定方法级别的,去查接口级别或全局的 return getParameter(key); } return value; }
public String getParameter(String key) { // 接口级别去查 timeout String value = parameters.get(key); if (value == null || value.length() == 0) { // 没的话查询全局级别 default.timeout value = parameters.get(Constants.DEFAULT_KEY_PREFIX + key); } return value; }
从代码中可以看出超时时间的设置:方法级别 > 接口级别 > 全局级别。
这里要特殊提一点,就是dubbo会合并服务端客户端的设置。
修改客户端配置, 只留下全局设置:
<dubbo:consumer timeout="2000" proxy="jdk"/> <dubbo:service interface="me.kimi.samples.dubbo.facade.QuestionFacade" ref="questionFacade"/>
服务端配置如下:
<dubbo:provider timeout="10000" accepts="500"/> <!-- service implementation, as same as regular local bean --> <bean id="questionFacade" class="me.kimi.samples.dubbo.provider.service.QuestionFacadeImpl"/> <!-- declare the service interface to be exported --> <dubbo:service interface="me.kimi.samples.dubbo.facade.QuestionFacade" ref="questionFacade" timeout="9000"/>
最后在客户端调用的时候,发现timeout是9000ms, debug发现客户端合并了url, 合并结果如下:
dubbo://172.16.71.30:20880/me.kimi.samples.dubbo.facade.QuestionFacade?anyhost=true&application=demo- provider&default.accepts=500&default.timeout=10000&dubbo=2.6.2&generic=false&interface=me.kimi.samples. dubbo.facade.QuestionFacade&logger=log4j&methods=getQuestionById&pid=17508&side=provider&timeout=9000 ×tamp=1536660132286
查看源码 com.alibaba.dubbo.registry.integration.RegistryDirectory#mergeUrl:
private URL mergeUrl(URL providerUrl) { providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // Merge the consumer side parameters List<Configurator> localConfigurators = this.configurators; // local reference if (localConfigurators != null && !localConfigurators.isEmpty()) { for (Configurator configurator : localConfigurators) { providerUrl = configurator.configure(providerUrl); } } providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker! // 这里就是合并服务器端的参数,所以除了timeout参数,其他很多参数也是这样的 // 即已客户端优先 this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); if ((providerUrl.getPath() == null || providerUrl.getPath().length() == 0) && "dubbo".equals(providerUrl.getProtocol())) { // Compatible version 1.0 //fix by tony.chenl DUBBO-44 String path = directoryUrl.getParameter(Constants.INTERFACE_KEY); if (path != null) { int i = path.indexOf('/'); if (i >= 0) { path = path.substring(i + 1); } i = path.lastIndexOf(':'); if (i >= 0) { path = path.substring(0, i); } providerUrl = providerUrl.setPath(path); } } return providerUrl; }
所以综合,超时时间的优先级为:
consumer方法级别 > provider 方法级别 > consumer 接口级别 > provider 接口级别 > consumer 全局级别 > provider 全局级别。
二、超时实现
有了超时时间,那么dubbo是怎么实现超时的呢?
再看上面的DubboInvoker,对于一般的有返回值的调用,最终调用:
return (Result) currentClient.request(inv, timeout).get();
先看一下request方法,来到 com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel的Request方法:
@Override public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
重点是 DefaultFuture:
static { Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer"); th.setDaemon(true); th.start(); }
类加载的时候会启动一个超时扫描线程:
public DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 每个 DefaultFuture 都有一个 id, 对应当前请求id, 然后被放到 静态Map中。 FUTURES.put(id, this); // id 对应的 Channel 也存起来,后续超时需要处理 CHANNELS.put(id, channel); }
再看下get方法:
@Override public Object get() throws RemotingException { return get(timeout); } @Override public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { // 这里可以看到在调用的时候需要等待 done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } // 处理返回值 // 线程扫描超时,正常返回都在这里 return returnFromResponse(); }
从上面代码上可以看到,get方法,会使当前线程挂起等待。那么什么时候会被恢复呢,可以想到两类情况:
1、超时
2、服务端正常返回
那么回过头来看看超时扫描线程,看一下扫描线程做了什么事情:
private static class RemotingInvocationTimeoutScan implements Runnable { @Override public void run() { while (true) { try { // 就是去扫描DefaultFuture列表 for (DefaultFuture future : FUTURES.values()) { if (future == null || future.isDone()) { continue; } // 如果future未完成,且超时 if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) { // 创建一个异常的Response Response timeoutResponse = new Response(future.getId()); // set timeout status. timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); // 处理异常 DefaultFuture.received(future.getChannel(), timeoutResponse); } } Thread.sleep(30); } catch (Throwable e) { logger.error("Exception when scan the timeout invocation of remoting.", e); } } } }
看下 received方法
public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } }
private void doReceived(Response res) { lock.lock(); try { // 设置响应 // 这样isDone就是true了 response = res; if (done != null) { // 恢复挂起的线程 done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }
显然这里扫描线程把用户请求线程恢复了。 恢复以后,顺着刚才的 DefaultFuture 的get方法,来到 returnFromResponse方法:
private Object returnFromResponse() throws RemotingException { Response res = response; if (res == null) { throw new IllegalStateException("response cannot be null"); } // 正常返回,返回 Result 对象 if (res.getStatus() == Response.OK) { return res.getResult(); } // 超时处理 if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { // 重新抛出异常 throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); } throw new RemotingException(channel, res.getErrorMessage()); }
超时扫描线程,构建了一个 超时 Response, 在这里抛出 超时异常。
超时抛异常是看见了,那么正常返回是怎么处理的呢,因为 done还 await在那里。 这里暂时不细说dubbo其他组件的原理,只要知道在网络事件完成(即服务器端在规定时间内正常返回)的时候,会有个回调,在整个回调过程中,最终会回调到 com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler 的 received 方法,看下代码:
@Override public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { Response response = handleRequest(exchangeChannel, request); channel.send(response); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { // 请求会回调到这里 handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }
处理响应:
static void handleResponse(Channel channel, Response response) throws RemotingException { // 不是心跳包,是正常的业务返回 if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }
这里看到,最终调用也是 DefaultFuture.received 的方法,和超时扫描的入口一样, 最终会恢复用户请求线程。唯一有区别的就是,这里是一个ok的Response, 而那边是timeout的response.
参考内容:https://www.roncoo.com/course/list.html?courseName=Dubbo

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Spring Boot 定制与优化内置的Tomcat容器
1、Spring Boot 定制与优化内置Tomcat容器。 > 内置的容器有三个分别是Undertow、Jetty、Tomcat,Spring Boot 对这三个容器分别进行了实现,它们上层接口都是EmbeddedServletContainerFactory,该接口也是本文的主要核心. 对于内置容器的定制与优化主要有两种方式,第一种方式是通过配置文件来配置,另外一种是通过码代码的方式.接下来主要对上述两种方式进行实现。 2、通过配置文件来定制与优化Tomcat > 配置的核心内容参考org.springframework.boot.autoconfigure.web.ServerProperties这个服务属性类,下面展示部分对tomcat的配置 server: port:8081 #tomcat设置 tomcat: accesslog: #开启日志访问 enabled:true #日志保存路径 directory:e:/tmp/logs 更多的配置内容参考org.springframework.boot.autoconfigure.web.ServerProperti...
- 下一篇
基于Dubbo的分布式系统架构实战
安装Dubbo注册中心(Zookeeper-3.4.6) 安装Dubbo管理控制台 Tomcat中部署web应用 ---- Dubbo服务消费者Web应用war包的部署 Dubbo监控中心的介绍与简易监控中心的安装( 补充文档 ) SVN版本管理系统的安装 CentOS + Subversion + Apache + Jsvnadmin Maven私有库和本地库的安装与配置 Sonatype Nexus + Maven 持续集成篇-- SonarQube代码质量管理平台的安装 持续集成篇-- SonarQube代码质量管理平台的配置与使用 持续集成篇 --Hudson持续集成服务器的安装配置与使用 分布式架构实战--ActiveMQ的安装与使用(单节点) Redis的安装与使用(单节点) Dubbo分布式架构实战--FastDFS分布式文件系统的安装与使用(单节点) 分布式架构--简易版支付系统介绍 分布式系统架构实战--简易版支付系统部署(单节点) 分布式架构--Dubbo服务启动依赖检查 分布式架构--Dubbo负载均衡策略 Dubbo线程模型(结合Linux线程数限制配置的实战经...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7设置SWAP分区,小内存服务器的救世主
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- MySQL8.0.19开启GTID主从同步CentOS8
- Hadoop3单机部署,实现最简伪集群
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题