public class DubboInvoker<T> extends AbstractInvoker<T> { ... @Override protected Result doInvoke(final Invocation invocation) throws Throwable { ... return (Result) currentClient.request(inv, timeout).get(); //currentClient.request(inv, timeout)返回了一个DefaultFuture } ... }
我们可以看一下这个DefaultFuture的实现,
public class DefaultFuture implements ResponseFuture {
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>(); private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
// invoke id. private final long id; //Dubbo请求的id,每个消费者都是一个从0开始的long类型 private final Channel channel; private final Request request; private final int timeout; private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); private final long start = System.currentTimeMillis(); private volatile long sent; private volatile Response response; private volatile ResponseCallback callback; public DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // put into waiting map. FUTURES.put(id, this); //等待时以id为key把Future放入全局的Future Map中,这样回复数据回来了可以根据id找到对应的Future通知主线程 CHANNELS.put(id, channel); }
public void run() { for (;;) { .... SelectorUtil.select(selector);
proce***egisterTaskQueue(); processWriteTaskQueue(); processSelectedKeys(selector.selectedKeys()); //再处理select事件,读写都可能有 .... } } private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException { for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); i.remove(); try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { if (!read(k)) { // Connection already closed - no need to handle write. continue; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { writeFromSelectorLoop(k); } } catch (CancelledKeyException e) { close(k); }
if (cleanUpCancelledKeys()) { break; // break the loop to avoid ConcurrentModificationException } } } private boolean read(SelectionKey k) { ......
// Fire the event. fireMessageReceived(channel, buffer); //读取完后,最终会调用这个函数,发送一个收到信息的事件 ......
}
7.IO线程把请求交给Dubbo线程池
按配置不同,走的Handler不同,配置dispatch为all,走的handler如下。下面IO线程直接交给一个ExecutorService来处理这个请求,出现了熟悉的报错“Threadpool is exhausted",业务线程池满时,如果没有队列,就会报这个错。
public class AllChannelHandler extends WrappedChannelHandler { ...... public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring //fix The thread pool is full, refuses to call, does not return, and causes the consumer to waitfor time out if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if(request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } ...... }
8.服务端Dubbo线程池处理完请求后,把返回报文放入队列
线程池会调起下面的函数
public class HeaderExchangeHandler implements ChannelHandlerDelegate { ...... Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); ...... // find handler by message class. Object msg = req.getData(); try { // handle data. Object result = handler.reply(channel, msg); //真正的业务逻辑类 res.setStatus(Response.OK); res.setResult(result); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } return res; }
public void received(Channel channel, Object message) throws RemotingException { ......
for (int i = writeSpinCount; i > 0; i --) { localWrittenBytes = buf.transferTo(ch); if (localWrittenBytes != 0) { writtenBytes += localWrittenBytes; break; } if (buf.finished()) { break; } }
if (buf.finished()) { // Successful write - proceed to the next message. buf.release(); channel.currentWriteEvent = null; channel.currentWriteBuffer = null; evt = null; buf = null; future.setSuccess(); } else { // Not written fully - perhaps the kernel buffer is full. addOpWrite = true; channel.writeSuspended = true;
(base) niuxinli@ubuntu:~$ netstat -nat Active Internet connections (servers and established) Proto Recv-Q Send-Q Local Address Foreign Address State tcp 0 0 0.0.0.0:20880 0.0.0.0:* LISTEN tcp 0 36 192.168.1.7:22 192.168.1.4:58931 ESTABLISHED tcp 0 0 192.168.1.7:36666 192.168.1.7:2181 ESTABLISHED tcp 0 65160 192.168.1.7:20880 192.168.1.5:60760 ESTABLISHED
可以看以下Recv-Q和Send-Q的具体含义:
Recv-Q Established: The count of bytes not copied by the user program connected to this socket.
Send-Q Established: The count of bytes not acknowledged by the remote host.
Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。