您现在的位置是:首页 > 文章详情

dubbo 超时设置和源码分析

日期:2018-11-29点击:585

本文 dubbo 2.6.2

在工作中碰到一个业务接口时间比较长,需要修改超时时间,不知道原理,在网上搜索,看到有人说如果你觉得自己了解了dubbo的超时机制,那么问问自己以下问题:


  • 超时是针对消费端还是服务端?

  • 超时在哪设置?

  • 超时设置的优先级是什么?

  • 超时的实现原理是什么?

  • 超时解决的是什么问题 ?


如果连这些都回答不上了,那只能说明还没有完全掌握 dubbo的超时机制。


于是索性就自己本地搭了个环境,研究了一下源码。 先来说一说结论:


1、超时是针对消费端的,消费端会抛出TimeoutException 而服务器端仅仅是一个 warn日志

2、超时在消费端、服务器端设置,dubbo会合并这两个设置

3、consumer方法级别 > provider 方法级别 > consumer 接口级别 > provider 接口级别 > consumer 全局级别 > provider 全局级别。如果都没配置,那么就是dubbo默认的1秒

4、见下面分析

5、最主要是宝贵的线程,客户端的用户线程不能因为服务端超时而一直类似wait, 导致无法正常响应其他业务。


一、超时时间设置

全局超时配置

<dubbo:consumer timeout="5000" />

指定接口以及特定方法超时配置

    <dubbo:service interface="me.kimi.samples.dubbo.facade.QuestionFacade" ref="questionFacade" timeout="6000">         <dubbo:method name="getQuestionById" timeout="7000"/>     </dubbo:service>

观察控制台打印的注册URL:

consumer://172.16.71.30/me.kimi.samples.dubbo.facade.QuestionFacade?application=demo- consumer&category=providers,configurators,routers&check=false&default.proxy=jdk&default.timeout=5000& dubbo=2.6.2&getQuestionById.timeout=7000&interface=me.kimi.samples.dubbo.facade.QuestionFacade&logger= log4j&methods=getQuestionById&pid=13884&side=consumer&timeout=6000&timestamp=1536630294523

可以看到:


  • default.timeout=5000

  • timeout=6000

  • getQuestionById.timeout=7000


分别对应了全局、类级别、方法级别的超时设置。


省略一部分调用链,最终会来到这里 DubboInvoker,读取超时时间:


com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker

    @Override     protected Result doInvoke(final Invocation invocation) throws Throwable {         RpcInvocation inv = (RpcInvocation) invocation;         final String methodName = RpcUtils.getMethodName(invocation);         inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());         inv.setAttachment(Constants.VERSION_KEY, version);         ExchangeClient currentClient;         if (clients.length == 1) {             currentClient = clients[0];         } else {             currentClient = clients[index.getAndIncrement() % clients.length];         }         try {             boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);             boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);             // 读取超时时间,这里dubbo已经把服务端的timeout参数和消费端的timeout参数合并             int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);             if (isOneway) {                 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);                 currentClient.send(inv, isSent);                 RpcContext.getContext().setFuture(null);                 return new RpcResult();             } else if (isAsync) {                 ResponseFuture future = currentClient.request(inv, timeout);                 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));                 return new RpcResult();             } else {                 RpcContext.getContext().setFuture(null);                 // 返回 DefaultFuture                 // get()在没返回值之前会 阻塞 await                 return (Result) currentClient.request(inv, timeout).get();             }         } catch (TimeoutException e) {             throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);         } catch (RemotingException e) {             throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);         }     }

看一下参数获取的方法:

public int getMethodParameter(String method, String key, int defaultValue) {     // 首先查 getQuestionById.timeout     String methodKey = method + "." + key;     // 从数字缓存中先获取,不需要每次都 parseInt     Number n = getNumbers().get(methodKey);     if (n != null) {         return n.intValue();     }     // 没得话,去取字符串值     String value = getMethodParameter(method, key);     if (value == null || value.length() == 0) {         // 三个地方都没配置,返回默认值,默认是1秒         return defaultValue;     }     // 放入缓存中     int i = Integer.parseInt(value);     getNumbers().put(methodKey, i);     return i; }
public String getMethodParameter(String method, String key) {     // 首先查 getQuestionById.timeout     String value = parameters.get(method + "." + key);     if (value == null || value.length() == 0) {         // 没有设定方法级别的,去查接口级别或全局的         return getParameter(key);     }     return value; }
public String getParameter(String key) {     // 接口级别去查 timeout      String value = parameters.get(key);     if (value == null || value.length() == 0) {         // 没的话查询全局级别 default.timeout         value = parameters.get(Constants.DEFAULT_KEY_PREFIX + key);     }     return value; }

从代码中可以看出超时时间的设置:方法级别 > 接口级别 > 全局级别。


这里要特殊提一点,就是dubbo会合并服务端客户端的设置。


修改客户端配置, 只留下全局设置:

<dubbo:consumer timeout="2000" proxy="jdk"/>     <dubbo:service interface="me.kimi.samples.dubbo.facade.QuestionFacade" ref="questionFacade"/>

服务端配置如下:

    <dubbo:provider timeout="10000" accepts="500"/>     <!-- service implementation, as same as regular local bean -->     <bean id="questionFacade" class="me.kimi.samples.dubbo.provider.service.QuestionFacadeImpl"/>     <!-- declare the service interface to be exported -->     <dubbo:service interface="me.kimi.samples.dubbo.facade.QuestionFacade" ref="questionFacade" timeout="9000"/>

最后在客户端调用的时候,发现timeout是9000ms, debug发现客户端合并了url, 合并结果如下:

dubbo://172.16.71.30:20880/me.kimi.samples.dubbo.facade.QuestionFacade?anyhost=true&application=demo- provider&default.accepts=500&default.timeout=10000&dubbo=2.6.2&generic=false&interface=me.kimi.samples. dubbo.facade.QuestionFacade&logger=log4j&methods=getQuestionById&pid=17508&side=provider&timeout=9000 &timestamp=1536660132286

查看源码 com.alibaba.dubbo.registry.integration.RegistryDirectory#mergeUrl:

private URL mergeUrl(URL providerUrl) {     providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // Merge the consumer side parameters     List<Configurator> localConfigurators = this.configurators; // local reference     if (localConfigurators != null && !localConfigurators.isEmpty()) {         for (Configurator configurator : localConfigurators) {             providerUrl = configurator.configure(providerUrl);         }     }     providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker!     // 这里就是合并服务器端的参数,所以除了timeout参数,其他很多参数也是这样的     // 即已客户端优先     this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters());      if ((providerUrl.getPath() == null || providerUrl.getPath().length() == 0)             && "dubbo".equals(providerUrl.getProtocol())) { // Compatible version 1.0         //fix by tony.chenl DUBBO-44         String path = directoryUrl.getParameter(Constants.INTERFACE_KEY);         if (path != null) {             int i = path.indexOf('/');             if (i >= 0) {                 path = path.substring(i + 1);             }             i = path.lastIndexOf(':');             if (i >= 0) {                 path = path.substring(0, i);             }             providerUrl = providerUrl.setPath(path);         }     }     return providerUrl; }

所以综合,超时时间的优先级为:

consumer方法级别 > provider 方法级别 > consumer 接口级别 > provider 接口级别 > consumer 全局级别 > provider 全局级别。


二、超时实现

有了超时时间,那么dubbo是怎么实现超时的呢?


再看上面的DubboInvoker,对于一般的有返回值的调用,最终调用:

return (Result) currentClient.request(inv, timeout).get();

先看一下request方法,来到 com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel的Request方法:

    @Override     public ResponseFuture request(Object request, int timeout) throws RemotingException {         if (closed) {             throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");         }         // create request.         Request req = new Request();         req.setVersion("2.0.0");         req.setTwoWay(true);         req.setData(request);                 DefaultFuture future = new DefaultFuture(channel, req, timeout);         try {             channel.send(req);         } catch (RemotingException e) {             future.cancel();             throw e;         }         return future;     }

重点是 DefaultFuture:

static {     Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");     th.setDaemon(true);     th.start(); }

类加载的时候会启动一个超时扫描线程:

public DefaultFuture(Channel channel, Request request, int timeout) {     this.channel = channel;     this.request = request;     this.id = request.getId();     this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);     // 每个 DefaultFuture 都有一个 id, 对应当前请求id, 然后被放到 静态Map中。     FUTURES.put(id, this);     // id 对应的 Channel 也存起来,后续超时需要处理     CHANNELS.put(id, channel); }

再看下get方法:

@Override public Object get() throws RemotingException {     return get(timeout); } @Override public Object get(int timeout) throws RemotingException {     if (timeout <= 0) {         timeout = Constants.DEFAULT_TIMEOUT;     }     if (!isDone()) {         long start = System.currentTimeMillis();         lock.lock();         try {             while (!isDone()) {                 // 这里可以看到在调用的时候需要等待                 done.await(timeout, TimeUnit.MILLISECONDS);                 if (isDone() || System.currentTimeMillis() - start > timeout) {                     break;                 }             }         } catch (InterruptedException e) {             throw new RuntimeException(e);         } finally {             lock.unlock();         }                  if (!isDone()) {             throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));         }     }     // 处理返回值     // 线程扫描超时,正常返回都在这里     return returnFromResponse(); }

从上面代码上可以看到,get方法,会使当前线程挂起等待。那么什么时候会被恢复呢,可以想到两类情况:


1、超时

2、服务端正常返回


那么回过头来看看超时扫描线程,看一下扫描线程做了什么事情:

private static class RemotingInvocationTimeoutScan implements Runnable {         @Override         public void run() {             while (true) {                 try {                     // 就是去扫描DefaultFuture列表                     for (DefaultFuture future : FUTURES.values()) {                         if (future == null || future.isDone()) {                             continue;                         }                         // 如果future未完成,且超时                         if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {                             // 创建一个异常的Response                             Response timeoutResponse = new Response(future.getId());                             // set timeout status.                             timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);                             timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));                             // 处理异常                             DefaultFuture.received(future.getChannel(), timeoutResponse);                         }                     }                     Thread.sleep(30);                 } catch (Throwable e) {                     logger.error("Exception when scan the timeout invocation of remoting.", e);                 }             }         }     }

看下 received方法

public static void received(Channel channel, Response response) {     try {         DefaultFuture future = FUTURES.remove(response.getId());         if (future != null) {             future.doReceived(response);         } else {             logger.warn("The timeout response finally returned at "                     + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))                     + ", response " + response                     + (channel == null ? "" : ", channel: " + channel.getLocalAddress()                     + " -> " + channel.getRemoteAddress()));         }     } finally {         CHANNELS.remove(response.getId());     } }
private void doReceived(Response res) {     lock.lock();     try {         // 设置响应         // 这样isDone就是true了         response = res;         if (done != null) {             // 恢复挂起的线程              done.signal();         }     } finally {         lock.unlock();     }     if (callback != null) {         invokeCallback(callback);     } }

显然这里扫描线程把用户请求线程恢复了。 恢复以后,顺着刚才的 DefaultFuture 的get方法,来到 returnFromResponse方法:

private Object returnFromResponse() throws RemotingException {     Response res = response;     if (res == null) {         throw new IllegalStateException("response cannot be null");     }     // 正常返回,返回 Result 对象     if (res.getStatus() == Response.OK) {         return res.getResult();     }     // 超时处理     if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {         // 重新抛出异常         throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());     }     throw new RemotingException(channel, res.getErrorMessage()); }

超时扫描线程,构建了一个 超时 Response, 在这里抛出 超时异常。


超时抛异常是看见了,那么正常返回是怎么处理的呢,因为 done还 await在那里。 这里暂时不细说dubbo其他组件的原理,只要知道在网络事件完成(即服务器端在规定时间内正常返回)的时候,会有个回调,在整个回调过程中,最终会回调到 com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler 的 received 方法,看下代码:

@Override public void received(Channel channel, Object message) throws RemotingException {     channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());     ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);     try {         if (message instanceof Request) {             // handle request.             Request request = (Request) message;             if (request.isEvent()) {                 handlerEvent(channel, request);             } else {                 if (request.isTwoWay()) {                     Response response = handleRequest(exchangeChannel, request);                     channel.send(response);                 } else {                     handler.received(exchangeChannel, request.getData());                 }             }         } else if (message instanceof Response) {             // 请求会回调到这里             handleResponse(channel, (Response) message);         } else if (message instanceof String) {             if (isClientSide(channel)) {                 Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());                 logger.error(e.getMessage(), e);             } else {                 String echo = handler.telnet(channel, (String) message);                 if (echo != null && echo.length() > 0) {                     channel.send(echo);                 }             }         } else {             handler.received(exchangeChannel, message);         }     } finally {         HeaderExchangeChannel.removeChannelIfDisconnected(channel);     } }

处理响应:

static void handleResponse(Channel channel, Response response) throws RemotingException {     // 不是心跳包,是正常的业务返回     if (response != null && !response.isHeartbeat()) {         DefaultFuture.received(channel, response);     } }

这里看到,最终调用也是 DefaultFuture.received 的方法,和超时扫描的入口一样, 最终会恢复用户请求线程。唯一有区别的就是,这里是一个ok的Response, 而那边是timeout的response.


参考内容:https://www.roncoo.com/course/list.html?courseName=Dubbo


原文链接:https://blog.roncoo.com/article/134045
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章