Dubbo分析之Exchange层
系列文章
Dubbo分析Serialize层
Dubbo分析之Transport层
Dubbo分析之Exchange 层
Dubbo分析之Protocol层
前言
紧接着上文Dubbo分析之Transport层,本文继续介绍Exchange层,此层官方介绍为信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer;下面分别进行介绍
Exchanger分析
Exchanger是此层的核心接口类,提供了connect()和bind()接口,分别返回ExchangeClient和ExchangeServer;dubbo提供了此接口的默认实现类HeaderExchanger,代码如下:
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
在实现类中在connect和bind中分别实例化了HeaderExchangeClient和HeaderExchangeServer,传入的参数是Transporters,可以认为这里就是Transport层的入口类;这里的ExchangeClient/ExchangeServer其实就是对Client/Server的包装,同时传入了自己的ChannelHandler;ChannelHandler已经在Transport层介绍过了,提供了连接建立,连接端口,发送请求,接受请求等接口;已默认使用的Netty为例,这里就是对NettyClient和NettyServer的包装,同时传入DecodeHandler,在NettyHandler中被调用;
ExchangeClient分析
ExchangeClient本身也继承于Client,同时也继承于ExchangeChannel:
public interface ExchangeClient extends Client, ExchangeChannel {
}
public interface ExchangeChannel extends Channel {
ResponseFuture request(Object request) throws RemotingException;
ResponseFuture request(Object request, int timeout) throws RemotingException;
ExchangeHandler getExchangeHandler();
@Override
void close(int timeout);
}
ExchangeChannel负责将上层的data包装成Request,然后发送给Transport层;具体的逻辑在HeaderExchangeChannel中:
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
创建了一个Request,在构造器中同时会产生一个RequestId;设置了协议版本,是否双向通信,最后设置了真实的业务数据;接下来实例化了一个DefaultFuture类,此类实现了同步转异步的方式,channel调用send发送请求之后,不需要等待结果,直接将DefaultFuture返回给上层,上层可以通过调用DefaultFuture的get方法来获取响应,get方法会阻塞等待获取服务器的响应才会返回;Client接收消息在handler里面,比如Netty在NettyHandler里面messageReceived方法介绍响应消息,NettyHandler最终会调用上面传入的DecodeHandler,DecodeHandler会先判断一下是否已经解码,如果解码就直接调用HeaderExchangeHandler,默认已经设置了编码解码器,所以会直接调用HeaderExchangeHandler里面的received方法:
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
服务端和客户端都会使用此方法,这里是客户端接受的是Response,直接调用handleResponse方法:
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
接收到响应之后,再去告诉DefaultFuture已经收到响应,DefaultFuture本身存放了requestId对应DefaultFuture的一个ConcurrentHashMap;具体怎么映射过去,Response也包含一个responseId,此responseId和requestId是相同的;
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
通过responseId获取了之前请求时创建的DefaultFuture,然后再更新DefaultFuture内部的response对象,更新完之后在调用Condition的signal方法,用户唤起通过DefaultFuture的get方法获取响应的阻塞线程:
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
可以发现阻塞要么被获取被signal方法唤醒,要么等待超时;以上大致是客户端发送获取响应的流程,下面看看服务器端流程
ExchangeServer分析
ExchangeServer继承于Server,同时提供了两个包装服务端Channel的方法
public interface ExchangeServer extends Server {
Collection<ExchangeChannel> getExchangeChannels();
ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress);
}
服务器端主要用于接收Request消息,然后处理消息,最后把响应发送给客户端,相关接收消息已经在上面介绍过了,同样是在HeaderExchangeHandler里面的received方法中,只不过这里的消息类型为Request;
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) msg = null;
else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
else msg = data.toString();
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
return res;
}
// find handler by message class.
Object msg = req.getData();
try {
// handle data.
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
首先创建了一个Response,并且指定responseId为requestId,方便在客户端定位到具体的DefaultFuture;然后调用handler的reply方法处理消息,返回结果,如何处理的将在后面的protocol层介绍,大致就是通过Request的信息,反射调用Server端的服务,然后返回结果,然后将结果放入Response对象中,通过channel将消息发送客户端;
总结
本文介绍了Exchange层的大体流程,围绕Exchanger,ExchangeClient和ExchangeServer展开;请求封装成Request,响应封装成Response,客户端通过异步的方式接收服务器请求;
示例代码地址
https://github.com/ksfzhaohui...
https://gitee.com/OutOfMemory...

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
快速了解 mpvue 开发小程序
一、概念 mpvue是 美团 修改了 Vue.js 的 runtime 和 compiler 使其可以运行在小程序环境中,从而引入了整套 Vue.js 开发体验的小程序框架。 二、优化细节 1、实例生命周期 不同于vue的是我们会在小程序 onReady 后,再去触发 vue mounted 生命周期 除了 Vue 本身的生命周期外(详细的 vue 生命周期文档请看生命周期钩子),mpvue 还兼容了小程序生命周期,这部分生命周期钩子的来源于微信小程序的 Page, 除特殊情况外,不建议使用小程序的生命周期钩子。 2、模板语法 几乎全支持 官方文档:模板语法,但需要注意的是: (1)组件:由于要预编译出 wxml,只能使用单文件组件(.vue 组件)的形式进行支持,不支持:动态组件,异步组件,自定义 render,inline-template,X-Templates,<script type="text/x-template"> 字符串模版,Slot(scoped 暂时还没做支持)。 (2)不要在选项属性或回调上使用箭头函数,.eg: //箭头函数是和父级上下文绑定在一起...
-
下一篇
JVM 在遇到OOM(OutOfMemoryError)时生成Dump文件的三种方式
JVM 在遇到OOM(OutOfMemoryError)时生成Dump文件的三种方式,以及如何使用Eclips Memory Analyzer(MAT)插件进行堆内存分析。 方法一: jmap -dump:format=b,file=文件名 [pid] 例如: jmap -dump:format=b,file=/usr/local/base/02.hprof 12942 方法二: 让JVM在遇到OOM(OutOfMemoryError)时生成Dump文件,需要配置一些信息 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/usr/local/base 比如:我用eclipse配置一下。如下图所示: 方法三: 使用 jcmd 命令生成 dump 文件 jcmd <pid> GC.heap_dump d:\dump\heap.hprof 此方法没有经过博主的测试。 分析: dump文件可以通过MemoryAnalyzer(MAT)分析查看,可以查看dump时对象数量,内存占用,线程情况等。 我们现在来安装一下eclipse MA...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Docker安装Oracle12C,快速搭建Oracle学习环境
- 面试大杂烩
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS关闭SELinux安全模块
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2全家桶,快速入门学习开发网站教程
- MySQL数据库在高并发下的优化方案